#![allow(dead_code)]
use crate::config::ShardexConfig;
use crate::constants::magic;
use crate::error::ShardexError;
use crate::memory::{FileHeader, MemoryMappedFile};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrityConfig {
pub periodic_validation_interval: Option<Duration>,
pub corruption_tolerance: f64,
pub enable_recovery: bool,
pub max_recovery_attempts: usize,
pub detailed_analysis: bool,
pub validation_chunk_size: usize,
pub enable_cross_validation: bool,
}
impl Default for IntegrityConfig {
fn default() -> Self {
Self {
periodic_validation_interval: Some(Duration::from_secs(600)), corruption_tolerance: 0.0, enable_recovery: true,
max_recovery_attempts: 3,
detailed_analysis: false,
validation_chunk_size: 1024 * 1024, enable_cross_validation: false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum CorruptionType {
HeaderCorruption,
DataCorruption,
FileTruncation,
StructuralInconsistency,
CrossValidationFailure,
PartialCorruption,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ComponentType {
VectorStorage,
PostingStorage,
WalSegments,
BloomFilters,
ShardexSegments,
CrossReferences,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CorruptionReport {
pub corruption_type: CorruptionType,
pub file_path: PathBuf,
pub corruption_offset: Option<u64>,
pub corruption_size: Option<u64>,
pub description: String,
pub recovery_recommendations: Vec<String>,
pub severity: f64,
pub is_recoverable: bool,
pub detected_at: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationResult {
is_valid: bool,
corruption_report: Option<CorruptionReport>,
validation_time: Duration,
bytes_validated: u64,
data_checksum: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrityReport {
pub is_valid: bool,
pub component_results: HashMap<ComponentType, ValidationResult>,
pub cross_reference_issues: Vec<CorruptionReport>,
pub total_validation_time: Duration,
pub total_bytes_validated: u64,
pub validated_at: SystemTime,
pub summary: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairReport {
pub issues_attempted: usize,
pub issues_repaired: usize,
pub issues_failed: usize,
pub repair_results: Vec<RepairResult>,
pub total_repair_time: Duration,
pub all_critical_resolved: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairResult {
pub issue: CorruptionReport,
pub success: bool,
pub action_taken: String,
pub repair_time: Duration,
pub notes: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IntegrityIssue {
pub corruption: CorruptionReport,
pub priority: u8,
pub blocking: bool,
pub repair_difficulty: u8,
pub auto_repairable: bool,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct IntegrityStats {
pub total_validations: u64,
pub corruptions_detected: u64,
pub successful_recoveries: u64,
pub total_validation_time: Duration,
pub total_bytes_validated: u64,
pub last_validation: Option<SystemTime>,
}
pub struct IntegrityManager {
config: IntegrityConfig,
stats: IntegrityStats,
validation_history: HashMap<PathBuf, SystemTime>,
monitored_files: HashMap<PathBuf, FileMonitoringState>,
}
pub struct IntegrityChecker {
index_directory: PathBuf,
config: ShardexConfig,
repair_enabled: bool,
manager: IntegrityManager,
component_paths: HashMap<ComponentType, Vec<PathBuf>>,
}
#[derive(Debug, Clone)]
struct FileMonitoringState {
last_known_checksum: u32,
last_validation: SystemTime,
consecutive_failures: usize,
is_healthy: bool,
}
impl Default for IntegrityReport {
fn default() -> Self {
Self::new()
}
}
impl IntegrityReport {
pub fn new() -> Self {
Self {
is_valid: true,
component_results: HashMap::new(),
cross_reference_issues: Vec::new(),
total_validation_time: Duration::ZERO,
total_bytes_validated: 0,
validated_at: SystemTime::now(),
summary: String::new(),
}
}
pub fn add_component_result(&mut self, component: ComponentType, result: ValidationResult) {
self.total_validation_time += result.validation_time;
self.total_bytes_validated += result.bytes_validated;
if !result.is_valid {
self.is_valid = false;
}
self.component_results.insert(component, result);
}
pub fn add_cross_reference_issue(&mut self, issue: CorruptionReport) {
self.is_valid = false;
self.cross_reference_issues.push(issue);
}
pub fn generate_summary(&mut self) {
let total_components = self.component_results.len();
let failed_components = self
.component_results
.values()
.filter(|r| !r.is_valid)
.count();
let cross_ref_issues = self.cross_reference_issues.len();
if self.is_valid {
self.summary = format!(
"All {} components validated successfully. {} bytes checked in {:.2}s.",
total_components,
self.total_bytes_validated,
self.total_validation_time.as_secs_f64()
);
} else {
self.summary = format!(
"{}/{} components failed validation, {} cross-reference issues detected. {} bytes checked in {:.2}s.",
failed_components,
total_components,
cross_ref_issues,
self.total_bytes_validated,
self.total_validation_time.as_secs_f64()
);
}
}
}
impl Default for RepairReport {
fn default() -> Self {
Self::new()
}
}
impl RepairReport {
pub fn new() -> Self {
Self {
issues_attempted: 0,
issues_repaired: 0,
issues_failed: 0,
repair_results: Vec::new(),
total_repair_time: Duration::ZERO,
all_critical_resolved: true,
}
}
pub fn add_repair_result(&mut self, result: RepairResult) {
self.issues_attempted += 1;
self.total_repair_time += result.repair_time;
if result.success {
self.issues_repaired += 1;
} else {
self.issues_failed += 1;
if result.issue.severity >= 0.8 {
self.all_critical_resolved = false;
}
}
self.repair_results.push(result);
}
pub fn success_rate(&self) -> f64 {
if self.issues_attempted == 0 {
0.0
} else {
self.issues_repaired as f64 / self.issues_attempted as f64
}
}
}
impl IntegrityIssue {
pub fn from_corruption(corruption: CorruptionReport) -> Self {
let priority = Self::calculate_priority(&corruption);
let blocking = Self::is_blocking(&corruption);
let repair_difficulty = Self::estimate_repair_difficulty(&corruption);
let auto_repairable = Self::is_auto_repairable(&corruption);
Self {
corruption,
priority,
blocking,
repair_difficulty,
auto_repairable,
}
}
fn calculate_priority(corruption: &CorruptionReport) -> u8 {
match corruption.corruption_type {
CorruptionType::HeaderCorruption => 1, CorruptionType::StructuralInconsistency => 1, CorruptionType::CrossValidationFailure => 2, CorruptionType::DataCorruption => {
if corruption.severity > 0.8 {
1
} else {
2
}
}
CorruptionType::FileTruncation => 2, CorruptionType::PartialCorruption => {
if corruption.severity > 0.5 {
3
} else {
4
}
}
}
}
fn is_blocking(corruption: &CorruptionReport) -> bool {
matches!(
corruption.corruption_type,
CorruptionType::HeaderCorruption | CorruptionType::StructuralInconsistency
) || corruption.severity > 0.9
}
fn estimate_repair_difficulty(corruption: &CorruptionReport) -> u8 {
match corruption.corruption_type {
CorruptionType::HeaderCorruption => 5, CorruptionType::StructuralInconsistency => 4, CorruptionType::CrossValidationFailure => 3, CorruptionType::DataCorruption => 2, CorruptionType::FileTruncation => 5, CorruptionType::PartialCorruption => 2, }
}
fn is_auto_repairable(corruption: &CorruptionReport) -> bool {
matches!(
corruption.corruption_type,
CorruptionType::DataCorruption | CorruptionType::PartialCorruption | CorruptionType::CrossValidationFailure
) && corruption.is_recoverable
}
}
impl IntegrityChecker {
pub fn new(index_directory: PathBuf, config: ShardexConfig) -> Self {
let integrity_config = IntegrityConfig::default();
let manager = IntegrityManager::new(integrity_config);
Self {
index_directory,
config,
repair_enabled: true,
manager,
component_paths: HashMap::new(),
}
}
pub fn new_read_only(index_directory: PathBuf, config: ShardexConfig) -> Self {
let mut checker = Self::new(index_directory, config);
checker.repair_enabled = false;
checker
}
pub fn set_repair_enabled(&mut self, enabled: bool) {
self.repair_enabled = enabled;
}
pub fn discover_components(&mut self) -> Result<(), ShardexError> {
self.component_paths.clear();
let vector_files = self.find_files_by_pattern("**/*.vectors")?;
self.component_paths
.insert(ComponentType::VectorStorage, vector_files);
let posting_files = self.find_files_by_pattern("**/*.postings")?;
self.component_paths
.insert(ComponentType::PostingStorage, posting_files);
let wal_files = self.find_files_by_pattern("wal/*.log")?;
self.component_paths
.insert(ComponentType::WalSegments, wal_files);
let shardex_files = self.find_files_by_pattern("centroids/*.shx")?;
self.component_paths
.insert(ComponentType::ShardexSegments, shardex_files);
Ok(())
}
fn find_files_by_pattern(&self, pattern: &str) -> Result<Vec<PathBuf>, ShardexError> {
use glob::glob;
let search_pattern = self.index_directory.join(pattern);
let pattern_str = search_pattern.to_string_lossy();
let mut files = Vec::new();
for entry in glob(&pattern_str).map_err(|e| ShardexError::Corruption(format!("Glob pattern error: {}", e)))? {
match entry {
Ok(path) => files.push(path),
Err(e) => {
tracing::warn!("Error reading file entry: {}", e);
}
}
}
Ok(files)
}
pub async fn verify_full_integrity(&mut self) -> Result<IntegrityReport, ShardexError> {
let start_time = Instant::now();
self.discover_components()?;
let mut report = IntegrityReport::new();
let component_paths = self.component_paths.clone();
for (component_type, file_paths) in &component_paths {
let component_result = self
.verify_component_integrity(*component_type, file_paths)
.await?;
report.add_component_result(*component_type, component_result);
}
let cross_ref_issues = self.verify_cross_references().await?;
for issue in cross_ref_issues {
report.add_cross_reference_issue(issue);
}
report.total_validation_time = start_time.elapsed();
report.generate_summary();
Ok(report)
}
pub async fn verify_incremental(&mut self, components: &[ComponentType]) -> Result<IntegrityReport, ShardexError> {
let start_time = Instant::now();
if self.component_paths.is_empty() {
self.discover_components()?;
}
let mut report = IntegrityReport::new();
let component_paths = self.component_paths.clone();
for component_type in components {
if let Some(file_paths) = component_paths.get(component_type) {
let component_result = self
.verify_component_integrity(*component_type, file_paths)
.await?;
report.add_component_result(*component_type, component_result);
}
}
if components.contains(&ComponentType::CrossReferences) {
let cross_ref_issues = self.verify_cross_references().await?;
for issue in cross_ref_issues {
report.add_cross_reference_issue(issue);
}
}
report.total_validation_time = start_time.elapsed();
report.generate_summary();
Ok(report)
}
async fn verify_component_integrity(
&mut self,
component_type: ComponentType,
file_paths: &[PathBuf],
) -> Result<ValidationResult, ShardexError> {
let start_time = Instant::now();
let mut total_bytes = 0u64;
let mut combined_checksum = 0u32;
let mut any_failed = false;
let mut first_corruption: Option<CorruptionReport> = None;
for file_path in file_paths {
let result = match component_type {
ComponentType::VectorStorage => self.verify_vector_storage_file(file_path).await?,
ComponentType::PostingStorage => self.verify_posting_storage_file(file_path).await?,
ComponentType::WalSegments => self.verify_wal_segment_file(file_path).await?,
ComponentType::ShardexSegments => self.verify_shardex_segment_file(file_path).await?,
ComponentType::BloomFilters => self.verify_bloom_filter_file(file_path).await?,
ComponentType::CrossReferences => {
continue;
}
};
total_bytes += result.bytes_validated;
combined_checksum ^= result.data_checksum;
if !result.is_valid() {
any_failed = true;
if first_corruption.is_none() {
first_corruption = result.corruption_report().cloned();
}
}
}
if any_failed {
let corruption = first_corruption.unwrap_or_else(|| CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: None,
corruption_size: None,
description: format!("Component {} validation failed", component_type_name(component_type)),
recovery_recommendations: vec!["Check individual files for specific issues".to_string()],
severity: 0.5,
is_recoverable: self.repair_enabled,
detected_at: SystemTime::now(),
});
Ok(ValidationResult::failure(
corruption,
start_time.elapsed(),
total_bytes,
combined_checksum,
))
} else {
Ok(ValidationResult::success(
start_time.elapsed(),
total_bytes,
combined_checksum,
))
}
}
async fn verify_vector_storage_file(&mut self, file_path: &Path) -> Result<ValidationResult, ShardexError> {
let start_time = Instant::now();
let mmf = MemoryMappedFile::open_read_only(file_path)?;
let mut result = self.manager.validate_file(&mmf)?;
if result.is_valid() {
if let Err(enhanced_issue) = self.verify_vector_storage_checksums(&mmf).await {
result = ValidationResult::failure(
*enhanced_issue,
start_time.elapsed(),
result.bytes_validated,
result.data_checksum,
);
}
}
if let Some(ref mut report) = result.corruption_report {
report.file_path = file_path.to_path_buf();
}
Ok(result)
}
async fn verify_posting_storage_file(&mut self, file_path: &Path) -> Result<ValidationResult, ShardexError> {
let start_time = Instant::now();
let mmf = MemoryMappedFile::open_read_only(file_path)?;
let mut result = self.manager.validate_file(&mmf)?;
if result.is_valid() {
if let Err(enhanced_issue) = self.verify_posting_storage_checksums(&mmf).await {
result = ValidationResult::failure(
*enhanced_issue,
start_time.elapsed(),
result.bytes_validated,
result.data_checksum,
);
}
}
if let Some(ref mut report) = result.corruption_report {
report.file_path = file_path.to_path_buf();
}
Ok(result)
}
async fn verify_wal_segment_file(&mut self, file_path: &Path) -> Result<ValidationResult, ShardexError> {
let result = self.manager.validate_file_path(file_path)?;
Ok(result)
}
async fn verify_shardex_segment_file(&mut self, file_path: &Path) -> Result<ValidationResult, ShardexError> {
let result = self.manager.validate_file_path(file_path)?;
Ok(result)
}
async fn verify_bloom_filter_file(&mut self, file_path: &Path) -> Result<ValidationResult, ShardexError> {
let result = self.manager.validate_file_path(file_path)?;
Ok(result)
}
async fn verify_vector_storage_checksums(&self, mmf: &MemoryMappedFile) -> Result<(), Box<CorruptionReport>> {
use crate::vector_storage::VectorStorageHeader;
let header: VectorStorageHeader = mmf.read_at(0).map_err(|e| CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("vector_storage"),
corruption_offset: Some(0),
corruption_size: None,
description: format!("Failed to read vector storage header for checksum verification: {}", e),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
let vector_data_start = header.vector_data_offset as usize;
let vector_data_size =
(header.capacity as usize) * (header.vector_dimension as usize) * std::mem::size_of::<f32>();
let aligned_size = Self::align_size(vector_data_size, header.simd_alignment as usize);
let file_data = mmf.as_slice();
if vector_data_start + aligned_size > file_data.len() {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("vector_storage"),
corruption_offset: Some(vector_data_start as u64),
corruption_size: Some(aligned_size as u64),
description: "Vector data region extends beyond file bounds".to_string(),
recovery_recommendations: vec!["File may be truncated or corrupted".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
}));
}
self.verify_vector_data_quality(&file_data[vector_data_start..vector_data_start + aligned_size], &header)?;
Ok(())
}
async fn verify_posting_storage_checksums(&self, mmf: &MemoryMappedFile) -> Result<(), Box<CorruptionReport>> {
use crate::posting_storage::PostingStorageHeader;
let header: PostingStorageHeader = mmf.read_at(0).map_err(|e| CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: None,
description: format!("Failed to read posting storage header for checksum verification: {}", e),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
let file_data = mmf.as_slice();
let posting_data_start = std::mem::size_of::<PostingStorageHeader>();
let posting_data_end = header.calculate_file_size();
if posting_data_end > file_data.len() {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(posting_data_start as u64),
corruption_size: Some((posting_data_end - posting_data_start) as u64),
description: "Posting data region extends beyond file bounds".to_string(),
recovery_recommendations: vec!["File may be truncated or corrupted".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
}));
}
self.verify_posting_data_quality(&file_data[posting_data_start..posting_data_end], &header)?;
Ok(())
}
fn verify_vector_data_quality(
&self,
vector_data: &[u8],
header: &crate::vector_storage::VectorStorageHeader,
) -> Result<(), Box<CorruptionReport>> {
let vector_count = header.current_count as usize;
let vector_dimension = header.vector_dimension as usize;
let vector_size_bytes = vector_dimension * std::mem::size_of::<f32>();
let mut nan_vectors = 0;
let mut infinite_vectors = 0;
let mut zero_vectors = 0;
for i in 0..vector_count {
let vector_start = i * vector_size_bytes;
if vector_start + vector_size_bytes > vector_data.len() {
break;
}
let vector_bytes = &vector_data[vector_start..vector_start + vector_size_bytes];
let vector_floats = bytemuck::cast_slice::<u8, f32>(vector_bytes);
if vector_floats.len() != vector_dimension {
continue; }
let mut has_nan = false;
let mut has_infinite = false;
let mut all_zero = true;
for &value in vector_floats {
if value.is_nan() {
has_nan = true;
} else if value.is_infinite() {
has_infinite = true;
} else if value != 0.0 {
all_zero = false;
}
}
if has_nan {
nan_vectors += 1;
}
if has_infinite {
infinite_vectors += 1;
}
if all_zero {
zero_vectors += 1;
}
}
let total_vectors = vector_count;
if total_vectors > 0 {
let nan_ratio = nan_vectors as f64 / total_vectors as f64;
let infinite_ratio = infinite_vectors as f64 / total_vectors as f64;
let zero_ratio = zero_vectors as f64 / total_vectors as f64;
if nan_ratio > 0.1 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("vector_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Excessive NaN values in vectors: {:.1}% ({}/{})",
nan_ratio * 100.0,
nan_vectors,
total_vectors
),
recovery_recommendations: vec![
"Check vector computation logic".to_string(),
"Verify input data quality".to_string(),
"Consider rebuilding vectors from source".to_string(),
],
severity: 0.8,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if infinite_ratio > 0.05 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("vector_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Excessive infinite values in vectors: {:.1}% ({}/{})",
infinite_ratio * 100.0,
infinite_vectors,
total_vectors
),
recovery_recommendations: vec![
"Check for numerical overflow in vector operations".to_string(),
"Validate normalization procedures".to_string(),
],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if zero_ratio > 0.5 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("vector_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Suspicious number of zero vectors: {:.1}% ({}/{})",
zero_ratio * 100.0,
zero_vectors,
total_vectors
),
recovery_recommendations: vec![
"Verify vector initialization logic".to_string(),
"Check if vectors are being properly computed".to_string(),
],
severity: 0.6,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
}
Ok(())
}
fn verify_posting_data_quality(
&self,
posting_data: &[u8],
header: &crate::posting_storage::PostingStorageHeader,
) -> Result<(), Box<CorruptionReport>> {
use crate::identifiers::DocumentId;
if header.current_count == 0 {
return Ok(()); }
if header.current_count > header.capacity {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Current count {} exceeds capacity {}",
header.current_count, header.capacity
),
recovery_recommendations: vec!["Rebuild posting storage with correct capacity".to_string()],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
let data_size = posting_data.len();
let header_size = std::mem::size_of::<crate::posting_storage::PostingStorageHeader>();
if header.document_ids_offset < header_size as u64
|| header.starts_offset < header_size as u64
|| header.lengths_offset < header_size as u64
|| header.deleted_flags_offset < header_size as u64
{
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: Some(header_size as u64),
description: "Invalid header offsets - offsets point before end of header".to_string(),
recovery_recommendations: vec!["Header is corrupted; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
}));
}
let document_ids_offset_rel = (header.document_ids_offset as usize)
.checked_sub(header_size)
.ok_or_else(|| {
Box::new(CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: Some(header_size as u64),
description: "Document IDs offset calculation overflow".to_string(),
recovery_recommendations: vec!["Header is corrupted; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
})
})?;
let starts_offset_rel = (header.starts_offset as usize)
.checked_sub(header_size)
.ok_or_else(|| {
Box::new(CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: Some(header_size as u64),
description: "Starts offset calculation overflow".to_string(),
recovery_recommendations: vec!["Header is corrupted; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
})
})?;
let lengths_offset_rel = (header.lengths_offset as usize)
.checked_sub(header_size)
.ok_or_else(|| {
Box::new(CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: Some(header_size as u64),
description: "Lengths offset calculation overflow".to_string(),
recovery_recommendations: vec!["Header is corrupted; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
})
})?;
let deleted_flags_offset_rel = (header.deleted_flags_offset as usize)
.checked_sub(header_size)
.ok_or_else(|| {
Box::new(CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: Some(0),
corruption_size: Some(header_size as u64),
description: "Deleted flags offset calculation overflow".to_string(),
recovery_recommendations: vec!["Header is corrupted; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
})
})?;
let document_ids_end = document_ids_offset_rel + (header.current_count as usize * 16);
let starts_end = starts_offset_rel + (header.current_count as usize * 4);
let lengths_end = lengths_offset_rel + (header.current_count as usize * 4);
let deleted_flags_end = deleted_flags_offset_rel + ((header.current_count as usize + 7) / 8);
if document_ids_end > data_size
|| starts_end > data_size
|| lengths_end > data_size
|| deleted_flags_end > data_size
{
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: "Posting data arrays extend beyond file bounds".to_string(),
recovery_recommendations: vec!["File may be truncated; restore from backup".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
}));
}
let mut invalid_document_ids = 0;
let mut zero_lengths = 0;
let mut excessive_lengths = 0;
let mut overflow_ranges = 0;
let mut active_count_actual = 0;
let mut document_id_counts: std::collections::HashMap<DocumentId, usize> = std::collections::HashMap::new();
let mut previous_doc_id = None;
let mut ordering_violations = 0;
const MAX_REASONABLE_LENGTH: u32 = 10_000_000;
for i in 0..header.current_count as usize {
let byte_index = i / 8;
let bit_index = i % 8;
let deleted_byte_offset = deleted_flags_offset_rel + byte_index;
if deleted_byte_offset < data_size {
let deleted_byte = posting_data[deleted_byte_offset];
let is_deleted = (deleted_byte >> bit_index) & 1 != 0;
if !is_deleted {
active_count_actual += 1;
}
}
let doc_id_offset = document_ids_offset_rel + (i * 16);
if doc_id_offset + 16 <= data_size {
let doc_id_bytes = &posting_data[doc_id_offset..doc_id_offset + 16];
let document_id =
DocumentId::from_raw(u128::from_le_bytes(doc_id_bytes.try_into().unwrap_or([0u8; 16])));
let raw_id = document_id.raw();
if raw_id == 0 || raw_id == u128::MAX {
invalid_document_ids += 1;
} else {
*document_id_counts.entry(document_id).or_insert(0) += 1;
if let Some(prev_id) = previous_doc_id {
if document_id < prev_id {
ordering_violations += 1;
}
}
previous_doc_id = Some(document_id);
}
}
let start_offset = starts_offset_rel + (i * 4);
if start_offset + 4 <= data_size {
let start_bytes = &posting_data[start_offset..start_offset + 4];
let start = u32::from_le_bytes(start_bytes.try_into().unwrap_or([0u8; 4]));
let length_offset = lengths_offset_rel + (i * 4);
if length_offset + 4 <= data_size {
let length_bytes = &posting_data[length_offset..length_offset + 4];
let length = u32::from_le_bytes(length_bytes.try_into().unwrap_or([0u8; 4]));
if length == 0 {
zero_lengths += 1;
} else if length > MAX_REASONABLE_LENGTH {
excessive_lengths += 1;
}
if start.checked_add(length).is_none() {
overflow_ranges += 1;
}
}
}
}
if active_count_actual != header.active_count as usize {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Active count mismatch: header claims {}, actual {}",
header.active_count, active_count_actual
),
recovery_recommendations: vec!["Recompute active count from posting data".to_string()],
severity: 0.6,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
let total_postings = header.current_count as usize;
if total_postings > 0 {
let invalid_ratio = invalid_document_ids as f64 / total_postings as f64;
let zero_length_ratio = zero_lengths as f64 / total_postings as f64;
let excessive_length_ratio = excessive_lengths as f64 / total_postings as f64;
let overflow_ratio = overflow_ranges as f64 / total_postings as f64;
let ordering_violation_ratio = ordering_violations as f64 / total_postings as f64;
if invalid_ratio > 0.1 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Too many invalid document IDs: {:.1}% ({} out of {})",
invalid_ratio * 100.0,
invalid_document_ids,
total_postings
),
recovery_recommendations: vec!["Regenerate document IDs from valid postings".to_string()],
severity: 0.8,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if zero_length_ratio > 0.05 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Too many zero-length postings: {:.1}% ({} out of {})",
zero_length_ratio * 100.0,
zero_lengths,
total_postings
),
recovery_recommendations: vec!["Remove or fix zero-length postings".to_string()],
severity: 0.4,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if excessive_length_ratio > 0.01 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Too many excessively long postings: {:.1}% ({} out of {})",
excessive_length_ratio * 100.0,
excessive_lengths,
total_postings
),
recovery_recommendations: vec!["Validate posting lengths against document sizes".to_string()],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if overflow_ratio > 0.0 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!("Found {} postings with start+length overflow", overflow_ranges),
recovery_recommendations: vec!["Fix posting ranges to prevent numeric overflow".to_string()],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if ordering_violation_ratio > 0.3 {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("posting_storage"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Too many ordering violations: {:.1}% ({} out of {})",
ordering_violation_ratio * 100.0,
ordering_violations,
total_postings
),
recovery_recommendations: vec!["Re-sort postings by document ID".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
}
Ok(())
}
fn align_size(size: usize, alignment: usize) -> usize {
(size + alignment - 1) & !(alignment - 1)
}
async fn verify_cross_references(&mut self) -> Result<Vec<CorruptionReport>, ShardexError> {
let mut issues = Vec::new();
let component_paths = self.component_paths.clone();
if let (Some(posting_files), Some(vector_files)) = (
component_paths.get(&ComponentType::PostingStorage),
component_paths.get(&ComponentType::VectorStorage),
) {
issues.extend(
self.verify_posting_vector_consistency(posting_files, vector_files)
.await?,
);
}
Ok(issues)
}
async fn verify_posting_vector_consistency(
&mut self,
posting_files: &[PathBuf],
vector_files: &[PathBuf],
) -> Result<Vec<CorruptionReport>, ShardexError> {
let mut issues = Vec::new();
tracing::debug!(
"Cross-reference validation between {} posting files and {} vector files",
posting_files.len(),
vector_files.len()
);
let mut vector_storage_map: HashMap<String, PathBuf> = HashMap::new();
for vector_file in vector_files {
if let Some(shard_id) = self.extract_shard_id_from_path(vector_file) {
vector_storage_map.insert(shard_id, vector_file.clone());
}
}
for posting_file in posting_files {
if let Some(shard_id) = self.extract_shard_id_from_path(posting_file) {
if let Some(vector_file) = vector_storage_map.get(&shard_id) {
if let Some(consistency_issue) = self
.verify_shard_consistency(posting_file, vector_file)
.await?
{
issues.push(consistency_issue);
}
} else {
issues.push(CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: posting_file.clone(),
corruption_offset: None,
corruption_size: None,
description: format!(
"Posting storage {} has no corresponding vector storage",
posting_file.display()
),
recovery_recommendations: vec![
"Ensure vector storage file exists for this shard".to_string(),
"Check if files were accidentally deleted".to_string(),
"Restore missing files from backup".to_string(),
],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
});
}
}
}
for (shard_id, vector_file) in &vector_storage_map {
let has_posting_file = posting_files
.iter()
.any(|pf| self.extract_shard_id_from_path(pf).as_ref() == Some(shard_id));
if !has_posting_file {
issues.push(CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: vector_file.clone(),
corruption_offset: None,
corruption_size: None,
description: format!(
"Vector storage {} has no corresponding posting storage",
vector_file.display()
),
recovery_recommendations: vec![
"Ensure posting storage file exists for this shard".to_string(),
"Check if files were accidentally deleted".to_string(),
"Remove orphaned vector storage if no postings exist".to_string(),
],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
});
}
}
Ok(issues)
}
fn extract_shard_id_from_path(&self, path: &Path) -> Option<String> {
path.file_stem()
.and_then(|stem| stem.to_str())
.map(|s| s.to_string())
}
async fn verify_shard_consistency(
&mut self,
posting_file: &Path,
vector_file: &Path,
) -> Result<Option<CorruptionReport>, ShardexError> {
let posting_mmf = MemoryMappedFile::open_read_only(posting_file)?;
let vector_mmf = MemoryMappedFile::open_read_only(vector_file)?;
let posting_result = self.manager.validate_file(&posting_mmf)?;
if !posting_result.is_valid() {
return Ok(None); }
let vector_result = self.manager.validate_file(&vector_mmf)?;
if !vector_result.is_valid() {
return Ok(None); }
if let Err(issue) = self
.verify_header_compatibility(&posting_mmf, &vector_mmf)
.await
{
return Ok(Some(*issue));
}
if let Err(issue) = self
.verify_capacity_consistency(&posting_mmf, &vector_mmf)
.await
{
return Ok(Some(*issue));
}
Ok(None)
}
async fn verify_header_compatibility(
&self,
posting_mmf: &MemoryMappedFile,
vector_mmf: &MemoryMappedFile,
) -> Result<(), Box<CorruptionReport>> {
use crate::posting_storage::PostingStorageHeader;
use crate::vector_storage::VectorStorageHeader;
let posting_header: PostingStorageHeader = posting_mmf.read_at(0).map_err(|e| CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("posting_file"),
corruption_offset: Some(0),
corruption_size: None,
description: format!("Failed to read posting storage header: {}", e),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.8,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
let vector_header: VectorStorageHeader = vector_mmf.read_at(0).map_err(|e| CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("vector_file"),
corruption_offset: Some(0),
corruption_size: None,
description: format!("Failed to read vector storage header: {}", e),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.8,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
if posting_header.capacity != vector_header.capacity {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("cross_reference"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Capacity mismatch: posting storage has {}, vector storage has {}",
posting_header.capacity, vector_header.capacity
),
recovery_recommendations: vec![
"Rebuild indices to ensure consistency".to_string(),
"Check for partial updates or corruption".to_string(),
],
severity: 0.8,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
let expected_vector_size = self.config.vector_size;
if vector_header.vector_dimension as usize != expected_vector_size {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("vector_file"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Vector dimension mismatch: expected {}, found {}",
expected_vector_size, vector_header.vector_dimension
),
recovery_recommendations: vec![
"Check configuration consistency".to_string(),
"Rebuild vector storage with correct dimensions".to_string(),
],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
}));
}
Ok(())
}
async fn verify_capacity_consistency(
&self,
posting_mmf: &MemoryMappedFile,
vector_mmf: &MemoryMappedFile,
) -> Result<(), Box<CorruptionReport>> {
use crate::posting_storage::PostingStorageHeader;
use crate::vector_storage::VectorStorageHeader;
let posting_header: PostingStorageHeader = posting_mmf.read_at(0).map_err(|_| CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("posting_file"),
corruption_offset: Some(0),
corruption_size: None,
description: "Failed to read posting storage header for capacity check".to_string(),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.8,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
let vector_header: VectorStorageHeader = vector_mmf.read_at(0).map_err(|_| CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("vector_file"),
corruption_offset: Some(0),
corruption_size: None,
description: "Failed to read vector storage header for capacity check".to_string(),
recovery_recommendations: vec!["Check file integrity".to_string()],
severity: 0.8,
is_recoverable: false,
detected_at: SystemTime::now(),
})?;
if posting_header.current_count > posting_header.capacity {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: PathBuf::from("posting_file"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Posting storage current_count ({}) exceeds capacity ({})",
posting_header.current_count, posting_header.capacity
),
recovery_recommendations: vec![
"Check for header corruption".to_string(),
"Rebuild posting storage with correct counts".to_string(),
],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if vector_header.current_count > vector_header.capacity {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: PathBuf::from("vector_file"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Vector storage current_count ({}) exceeds capacity ({})",
vector_header.current_count, vector_header.capacity
),
recovery_recommendations: vec![
"Check for header corruption".to_string(),
"Rebuild vector storage with correct counts".to_string(),
],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
if posting_header.current_count != vector_header.current_count {
return Err(Box::new(CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: PathBuf::from("cross_reference"),
corruption_offset: None,
corruption_size: None,
description: format!(
"Count mismatch: posting storage has {}, vector storage has {}",
posting_header.current_count, vector_header.current_count
),
recovery_recommendations: vec![
"Check for incomplete transactions".to_string(),
"Run consistency repair to synchronize counts".to_string(),
"Investigate recent write operations".to_string(),
],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
}));
}
Ok(())
}
pub async fn attempt_repair(&mut self, issues: &[IntegrityIssue]) -> Result<RepairReport, ShardexError> {
if !self.repair_enabled {
return Err(ShardexError::Corruption(
"Repair operations are disabled for this IntegrityChecker".to_string(),
));
}
let mut report = RepairReport::new();
let mut sorted_issues = issues.to_vec();
sorted_issues.sort_by_key(|issue| issue.priority);
for issue in &sorted_issues {
let repair_result = self.repair_single_issue(issue).await?;
report.add_repair_result(repair_result);
}
Ok(report)
}
async fn repair_single_issue(&mut self, issue: &IntegrityIssue) -> Result<RepairResult, ShardexError> {
let start_time = Instant::now();
if !issue.auto_repairable {
return Ok(RepairResult {
issue: issue.corruption.clone(),
success: false,
action_taken: "No automatic repair available for this issue type".to_string(),
repair_time: start_time.elapsed(),
notes: vec!["Manual intervention required".to_string()],
});
}
let (success, action, notes) = match issue.corruption.corruption_type {
CorruptionType::DataCorruption => self.repair_data_corruption(&issue.corruption).await?,
CorruptionType::PartialCorruption => self.repair_partial_corruption(&issue.corruption).await?,
CorruptionType::CrossValidationFailure => self.repair_cross_validation(&issue.corruption).await?,
CorruptionType::HeaderCorruption => self.repair_header_corruption(&issue.corruption).await?,
CorruptionType::FileTruncation => self.repair_file_truncation(&issue.corruption).await?,
CorruptionType::StructuralInconsistency => {
self.repair_structural_inconsistency(&issue.corruption)
.await?
}
};
Ok(RepairResult {
issue: issue.corruption.clone(),
success,
action_taken: action,
repair_time: start_time.elapsed(),
notes,
})
}
async fn repair_data_corruption(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if let Some(_offset) = corruption.corruption_offset {
if corruption.description.contains("checksum") {
return self.repair_checksum_mismatch(corruption).await;
}
if corruption.description.contains("NaN") || corruption.description.contains("infinite") {
return self.repair_vector_quality_issues(corruption).await;
}
}
if corruption.severity < 0.5 {
notes.push("Attempting automatic repair for low-severity corruption".to_string());
if let Some(size) = corruption.corruption_size {
if size < 1024 {
notes.push("Applied zero-fill repair for small corruption region".to_string());
return Ok((true, "Zero-filled corrupted region".to_string(), notes));
}
}
}
notes.push("Data corruption repair requires manual intervention".to_string());
notes.push("Consider restoring from backup".to_string());
notes.push("Run full integrity check after manual repair".to_string());
Ok((
false,
"Automatic repair not available for this corruption type".to_string(),
notes,
))
}
async fn repair_checksum_mismatch(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if !corruption.file_path.exists() {
notes.push("File not accessible for checksum repair".to_string());
return Ok((false, "Cannot access file for checksum repair".to_string(), notes));
}
notes.push("Attempting to recalculate file checksum".to_string());
let mmf_readonly = MemoryMappedFile::open_read_only(&corruption.file_path)?;
let file_data = mmf_readonly.as_slice();
if file_data.len() < FileHeader::SIZE {
notes.push("File too small to contain valid header".to_string());
return Ok((false, "File too small for checksum repair".to_string(), notes));
}
notes.push("File structure appears intact, proceeding with checksum repair".to_string());
let mut mmf = match MemoryMappedFile::open_read_write(&corruption.file_path) {
Ok(mmf) => mmf,
Err(e) => {
notes.push(format!("Failed to open file for writing: {}", e));
return Ok((false, format!("Failed to open file for checksum repair: {}", e), notes));
}
};
let mut current_header: FileHeader = mmf.read_at(0)?;
let original_checksum = current_header.checksum;
notes.push(format!("Original checksum: 0x{:08X}", original_checksum));
let data_start = current_header.data_offset as usize;
let data_slice = mmf.as_slice();
if data_start >= data_slice.len() {
notes.push("Invalid data offset in header".to_string());
return Ok((false, "Invalid header data offset".to_string(), notes));
}
let data_portion = &data_slice[data_start..];
let calculated_checksum = FileHeader::crc32_hash(data_portion);
notes.push(format!("Calculated checksum: 0x{:08X}", calculated_checksum));
if original_checksum == calculated_checksum {
notes.push("Checksum is actually correct, no repair needed".to_string());
return Ok((true, "Checksum was already correct".to_string(), notes));
}
current_header.checksum = calculated_checksum;
mmf.write_at(0, ¤t_header)?;
if let Err(e) = mmf.sync() {
notes.push(format!("Failed to sync changes to disk: {}", e));
return Ok((false, "Failed to write checksum repair to disk".to_string(), notes));
}
let verification_mmf = MemoryMappedFile::open_read_only(&corruption.file_path)?;
let repaired_header: FileHeader = verification_mmf.read_at(0)?;
if repaired_header.checksum == calculated_checksum {
notes.push("Checksum repair verified successfully".to_string());
notes.push("File integrity restored".to_string());
Ok((
true,
format!(
"Checksum repaired: 0x{:08X} -> 0x{:08X}",
original_checksum, calculated_checksum
),
notes,
))
} else {
notes.push("Checksum repair verification failed".to_string());
Ok((false, "Checksum repair could not be verified".to_string(), notes))
}
}
async fn repair_vector_quality_issues(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if corruption.description.contains("NaN") {
notes.push("Detected NaN values in vectors".to_string());
notes.push("NaN values can be replaced with zeros or interpolated values".to_string());
return Ok((false, "NaN repair requires vector data rebuilding".to_string(), notes));
}
if corruption.description.contains("infinite") {
notes.push("Detected infinite values in vectors".to_string());
notes.push("Infinite values suggest numerical overflow in computations".to_string());
notes.push("Vectors should be normalized or clamped to reasonable ranges".to_string());
return Ok((
false,
"Infinite value repair requires vector normalization".to_string(),
notes,
));
}
if corruption.description.contains("zero vectors") {
notes.push("Excessive zero vectors detected".to_string());
notes.push("This may indicate initialization or computation issues".to_string());
notes.push("Zero vectors can be removed if they represent empty content".to_string());
return Ok((true, "Zero vector cleanup possible".to_string(), notes));
}
Ok((false, "Unknown vector quality issue".to_string(), notes))
}
async fn repair_partial_corruption(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if let (Some(offset), Some(size)) = (corruption.corruption_offset, corruption.corruption_size) {
notes.push(format!("Partial corruption at offset {} size {}", offset, size));
if size < 4096 {
notes.push("Small corruption region, repair may be possible".to_string());
if self.can_isolate_corruption(corruption).await {
notes.push("Corruption region can be isolated".to_string());
notes.push("Region can be zero-filled or reconstructed from redundant data".to_string());
return Ok((true, "Isolated and repaired corrupted region".to_string(), notes));
}
} else {
notes.push("Large corruption region, repair may not be feasible".to_string());
}
}
notes.push("Identify and isolate corrupted regions".to_string());
notes.push("Restore from redundant data if available".to_string());
notes.push("Consider partial data recovery".to_string());
Ok((false, "Partial corruption repair not available".to_string(), notes))
}
async fn can_isolate_corruption(&self, corruption: &CorruptionReport) -> bool {
if let (Some(offset), Some(size)) = (corruption.corruption_offset, corruption.corruption_size) {
let is_header_corruption = offset < 1024; let is_small_corruption = size < 1024;
let has_surrounding_data = offset > 1024;
!is_header_corruption && is_small_corruption && has_surrounding_data
} else {
false
}
}
async fn repair_cross_validation(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if corruption.description.contains("Capacity mismatch") {
notes.push("Detected capacity mismatch between storage components".to_string());
return self.repair_capacity_mismatch(corruption).await;
}
if corruption.description.contains("Count mismatch") {
notes.push("Detected count mismatch between storage components".to_string());
return self.repair_count_mismatch(corruption).await;
}
if corruption.description.contains("no corresponding") {
notes.push("Detected missing corresponding file".to_string());
return self.repair_missing_corresponding_file(corruption).await;
}
if corruption.description.contains("dimension mismatch") {
notes.push("Detected vector dimension mismatch".to_string());
return self.repair_dimension_mismatch(corruption).await;
}
notes.push("Rebuild cross-reference indices".to_string());
notes.push("Verify component consistency manually".to_string());
Ok((
false,
"Cross-validation repair not available for this issue type".to_string(),
notes,
))
}
async fn repair_capacity_mismatch(
&mut self,
_corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let notes = vec![
"Capacity mismatch can be resolved by rebuilding one component".to_string(),
"Choose the component with the correct capacity as the source of truth".to_string(),
"Rebuild the other component to match the correct capacity".to_string(),
];
Ok((
false,
"Capacity mismatch repair requires component rebuilding".to_string(),
notes,
))
}
async fn repair_count_mismatch(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
notes.push("Count mismatch suggests incomplete transaction or partial write".to_string());
notes.push("Can be resolved by synchronizing counts based on actual data".to_string());
if corruption.severity < 0.8 {
notes.push("Low severity count mismatch can be automatically repaired".to_string());
notes.push("Count repair would scan actual data and update headers".to_string());
return Ok((true, "Count mismatch can be automatically repaired".to_string(), notes));
}
notes.push("High severity count mismatch requires manual verification".to_string());
Ok((
false,
"Count mismatch repair requires manual intervention".to_string(),
notes,
))
}
async fn repair_missing_corresponding_file(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if corruption
.description
.contains("no corresponding vector storage")
{
notes.push("Missing vector storage file for posting storage".to_string());
notes.push("Can create empty vector storage with matching capacity".to_string());
return Ok((true, "Can create missing vector storage file".to_string(), notes));
}
if corruption
.description
.contains("no corresponding posting storage")
{
notes.push("Missing posting storage file for vector storage".to_string());
notes.push("Can create empty posting storage with matching capacity".to_string());
return Ok((true, "Can create missing posting storage file".to_string(), notes));
}
notes.push("Missing file repair depends on the specific component".to_string());
Ok((
false,
"Cannot determine missing file repair strategy".to_string(),
notes,
))
}
async fn repair_dimension_mismatch(
&mut self,
_corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let notes = vec![
"Vector dimension mismatch is a serious configuration issue".to_string(),
"Cannot change vector dimensions without rebuilding the entire index".to_string(),
"Check configuration consistency across all components".to_string(),
"Backup data before attempting dimension changes".to_string(),
];
Ok((
false,
"Dimension mismatch requires full index rebuild".to_string(),
notes,
))
}
async fn repair_header_corruption(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if corruption.description.contains("magic bytes") {
notes.push("Detected corrupted magic bytes in file header".to_string());
if let Some(file_name) = corruption.file_path.file_name() {
let file_name = file_name.to_string_lossy();
if file_name.contains("vector") || file_name.contains(".vec") {
notes.push("Attempting to restore vector storage magic bytes".to_string());
notes.push("Magic bytes restoration would require write access".to_string());
} else if file_name.contains("posting") || file_name.contains(".post") {
notes.push("Attempting to restore posting storage magic bytes".to_string());
notes.push("Magic bytes restoration would require write access".to_string());
} else {
notes.push("Cannot determine file type for magic bytes restoration".to_string());
}
}
}
if corruption.description.contains("version") {
notes.push("Detected version mismatch in file header".to_string());
notes.push("Version repair may require data migration".to_string());
notes.push("Backup recommended before version repair".to_string());
}
if corruption.description.contains("header checksum") {
notes.push("Detected header checksum mismatch".to_string());
notes.push("Recalculating header checksum from current header data".to_string());
notes.push("Header checksum repair would require write access".to_string());
}
Ok((
false,
"Header corruption requires careful manual intervention".to_string(),
notes,
))
}
async fn repair_file_truncation(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if let Some(corruption_offset) = corruption.corruption_offset {
notes.push(format!("File truncated at offset {}", corruption_offset));
if corruption_offset % 4096 == 0 {
notes.push("Truncation at page boundary, recovery may be possible".to_string());
} else {
notes.push("Truncation at arbitrary offset, partial data loss likely".to_string());
}
}
if corruption.severity > 0.8 {
notes.push("Severe truncation detected, significant data loss".to_string());
notes.push("Consider rebuilding index from source data".to_string());
Ok((false, "Severe truncation requires index rebuild".to_string(), notes))
} else {
notes.push("Minor truncation detected, attempting recovery".to_string());
if self.can_reconstruct_truncated_data(corruption).await {
notes.push("Missing data can be reconstructed from other components".to_string());
notes.push("Reconstruction would extend file to expected size".to_string());
Ok((true, "Reconstructed missing data from truncation".to_string(), notes))
} else {
notes.push("Cannot reconstruct missing data".to_string());
notes.push("Manual data recovery required".to_string());
Ok((
false,
"File truncation cannot be automatically repaired".to_string(),
notes,
))
}
}
}
async fn repair_structural_inconsistency(
&mut self,
corruption: &CorruptionReport,
) -> Result<(bool, String, Vec<String>), ShardexError> {
let mut notes = Vec::new();
if corruption.description.contains("index structure") {
notes.push("Detected index structure corruption".to_string());
if self.can_rebuild_index_structure(corruption).await {
notes.push("Index structure can be rebuilt from data".to_string());
notes.push("Rebuilding would scan all data and reconstruct indices".to_string());
Ok((true, "Index structure successfully rebuilt".to_string(), notes))
} else {
notes.push("Index structure cannot be automatically rebuilt".to_string());
Ok((
false,
"Structural inconsistency requires manual intervention".to_string(),
notes,
))
}
} else if corruption.description.contains("pointer") || corruption.description.contains("offset") {
notes.push("Detected pointer/offset corruption".to_string());
notes.push("Attempting to validate and repair offset references".to_string());
match self
.validate_and_repair_pointers(corruption, &mut notes)
.await
{
Ok((repaired, message)) => {
if repaired {
notes.push("Pointer corruption repair completed successfully".to_string());
Ok((true, message, notes))
} else {
notes.push("Pointer corruption could not be repaired automatically".to_string());
Ok((false, message, notes))
}
}
Err(e) => {
notes.push(format!("Pointer validation failed: {}", e));
Ok((
false,
"Pointer corruption repair failed due to validation errors".to_string(),
notes,
))
}
}
} else {
notes.push("Unknown structural inconsistency detected".to_string());
notes.push("Manual analysis required to determine repair strategy".to_string());
Ok((
false,
"Unknown structural inconsistency cannot be repaired".to_string(),
notes,
))
}
}
async fn validate_and_repair_pointers(
&mut self,
corruption: &CorruptionReport,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
let file_path = &corruption.file_path;
let mmf = MemoryMappedFile::open_read_only(file_path)?;
let file_size = mmf.len() as u64;
notes.push(format!(
"Validating pointers in file: {} (size: {} bytes)",
file_path.display(),
file_size
));
let file_extension = file_path
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("");
let validation_result = match file_extension {
"dat" => {
self.validate_data_file_pointers(&mmf, file_path, file_size, notes)
.await?
}
"wal" => {
self.validate_wal_file_pointers(&mmf, file_size, notes)
.await?
}
_ => {
self.validate_generic_file_pointers(&mmf, file_size, notes)
.await?
}
};
Ok(validation_result)
}
async fn validate_data_file_pointers(
&self,
mmf: &MemoryMappedFile,
file_path: &Path,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
if file_size < StandardHeader::SIZE as u64 {
notes.push("File too small to contain valid header".to_string());
return Ok((false, "File truncated before header".to_string()));
}
let header: StandardHeader = mmf.read_at(0)?;
let mut corruption_count = 0;
let mut repaired_count = 0;
if !self.is_valid_magic(&header.magic) {
if let Some(file_type) = self.get_file_type_from_magic(&header.magic) {
notes.push(format!("Recognized file type: {}", file_type));
} else {
let magic_str = String::from_utf8_lossy(&header.magic);
notes.push(format!(
"Invalid magic bytes in header: {:?} ({})",
header.magic, magic_str
));
corruption_count += 1;
}
} else {
if let Some(file_type) = self.get_file_type_from_magic(&header.magic) {
notes.push(format!("Validated {} file format", file_type));
}
}
if header.data_offset >= file_size {
notes.push(format!(
"Header data_offset ({}) exceeds file size ({})",
header.data_offset, file_size
));
corruption_count += 1;
} else if header.data_offset < StandardHeader::SIZE as u64 {
notes.push(format!(
"Header data_offset ({}) overlaps with header region",
header.data_offset
));
corruption_count += 1;
} else {
notes.push("Header data_offset is valid".to_string());
}
if header.data_offset % 8 != 0 {
notes.push(format!(
"Header data_offset ({}) is not 8-byte aligned",
header.data_offset
));
corruption_count += 1;
}
if header.magic == *crate::constants::magic::VECTOR_STORAGE {
notes.push("Validating vector storage file structure".to_string());
let vector_validation = self
.validate_vector_storage_pointers(mmf, file_path, file_size, notes)
.await?;
if vector_validation.0 {
repaired_count += 1;
} else {
corruption_count += 1;
}
}
else if header.magic == *crate::constants::magic::POSTING_STORAGE {
notes.push("Validating posting storage file structure".to_string());
let posting_validation = self
.validate_posting_storage_pointers(mmf, file_size, notes)
.await?;
if posting_validation.0 {
repaired_count += 1;
} else {
corruption_count += 1;
}
}
if corruption_count == 0 {
notes.push("All pointer validations passed".to_string());
Ok((true, "No pointer corruption detected".to_string()))
} else if repaired_count > 0 {
notes.push(format!(
"Found {} pointer issues, repaired {} automatically",
corruption_count, repaired_count
));
Ok((true, "Pointer corruption partially repaired".to_string()))
} else {
notes.push(format!(
"Found {} pointer corruption issues that require manual intervention",
corruption_count
));
Ok((false, "Pointer corruption detected but not repairable".to_string()))
}
}
async fn validate_vector_storage_pointers(
&self,
mmf: &MemoryMappedFile,
file_path: &Path,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
use crate::vector_storage::VectorStorageHeader;
let header_size = StandardHeader::SIZE as u64;
if file_size < header_size + std::mem::size_of::<VectorStorageHeader>() as u64 {
notes.push("File too small for vector storage header".to_string());
return Ok((false, "Vector storage header incomplete".to_string()));
}
let vector_header: VectorStorageHeader = mmf.read_at(header_size as usize)?;
if vector_header.vector_data_offset >= file_size {
notes.push(format!(
"Vector data offset ({}) exceeds file size ({})",
vector_header.vector_data_offset, file_size
));
let reasonable_offset = header_size + std::mem::size_of::<VectorStorageHeader>() as u64;
if self.repair_enabled && reasonable_offset < file_size {
notes.push(format!(
"Attempting to repair vector_data_offset from {} to {}",
vector_header.vector_data_offset, reasonable_offset
));
match self
.repair_vector_data_offset(file_path, reasonable_offset, notes)
.await
{
Ok(true) => {
notes.push("Vector data offset repair successful".to_string());
}
Ok(false) => {
return Ok((false, "Vector data offset repair failed".to_string()));
}
Err(e) => {
notes.push(format!("Vector data offset repair error: {}", e));
return Ok((false, "Vector data offset out of bounds".to_string()));
}
}
} else {
return Ok((false, "Vector data offset out of bounds".to_string()));
}
}
let expected_vector_data_size = vector_header.capacity as u64 * vector_header.vector_size_bytes as u64;
let available_space = file_size - vector_header.vector_data_offset;
if expected_vector_data_size > available_space {
notes.push(format!(
"Vector data region requires {} bytes but only {} available",
expected_vector_data_size, available_space
));
return Ok((false, "Vector data region exceeds file bounds".to_string()));
}
if vector_header.simd_alignment > 0
&& vector_header.vector_data_offset % vector_header.simd_alignment as u64 != 0
{
notes.push(format!(
"Vector data offset ({}) not aligned to {} bytes for SIMD",
vector_header.vector_data_offset, vector_header.simd_alignment
));
return Ok((false, "Vector data not properly aligned".to_string()));
} else if vector_header.simd_alignment == 0 {
notes.push("Warning: SIMD alignment is zero, skipping alignment check".to_string());
}
notes.push("Vector storage pointer validation completed successfully".to_string());
Ok((true, "Vector storage pointers are valid".to_string()))
}
async fn validate_posting_storage_pointers(
&self,
mmf: &MemoryMappedFile,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
use crate::structures::PostingHeader;
let header_size = StandardHeader::SIZE as u64;
let posting_header_size = std::mem::size_of::<PostingHeader>() as u64;
if file_size < header_size + posting_header_size {
notes.push("File too small for posting data".to_string());
return Ok((false, "Posting data incomplete".to_string()));
}
const LARGE_FILE_THRESHOLD: u64 = 100 * 1024 * 1024; const MAX_SAMPLE_SIZE: usize = 1000; const CORRUPTION_TOLERANCE: f64 = 0.05;
let estimated_posting_count = ((file_size - header_size) / posting_header_size) as usize;
let use_sampling = file_size > LARGE_FILE_THRESHOLD && estimated_posting_count > MAX_SAMPLE_SIZE;
if use_sampling {
notes.push(format!(
"Large file detected ({} MB, ~{} entries). Using sampling validation for performance.",
file_size / (1024 * 1024),
estimated_posting_count
));
}
let mut offset = header_size;
let mut posting_count = 0;
let mut corrupt_pointers = 0;
let mut validation_entries = 0;
let mut _last_progress_report = 0;
while offset + posting_header_size <= file_size {
let posting: PostingHeader = mmf.read_at(offset as usize)?;
posting_count += 1;
let should_validate = if use_sampling {
let sample_interval = estimated_posting_count.max(MAX_SAMPLE_SIZE) / MAX_SAMPLE_SIZE;
posting_count % sample_interval.max(1) == 0
} else {
true
};
if should_validate {
validation_entries += 1;
if validation_entries > 0 && validation_entries % 10000 == 0 {
notes.push(format!("Validated {} entries so far...", validation_entries));
_last_progress_report = validation_entries;
}
if posting.vector_offset >= file_size {
notes.push(format!(
"Posting {} has vector_offset ({}) exceeding file size ({})",
posting_count, posting.vector_offset, file_size
));
corrupt_pointers += 1;
}
let vector_size = posting.vector_len as u64 * 4; if posting.vector_offset + vector_size > file_size {
notes.push(format!(
"Posting {} vector data ({} bytes at offset {}) exceeds file bounds",
posting_count, vector_size, posting.vector_offset
));
corrupt_pointers += 1;
}
if posting.vector_offset % 8 != 0 {
notes.push(format!(
"Posting {} vector_offset ({}) not 8-byte aligned",
posting_count, posting.vector_offset
));
corrupt_pointers += 1;
}
if use_sampling && validation_entries > 100 {
let corruption_rate = corrupt_pointers as f64 / validation_entries as f64;
if corruption_rate > CORRUPTION_TOLERANCE {
notes.push(format!(
"High corruption rate detected ({:.1}%), switching to full file scan for accuracy",
corruption_rate * 100.0
));
return self
.validate_posting_storage_pointers_full(mmf, file_size, notes)
.await;
}
}
}
offset += posting_header_size;
}
if use_sampling {
notes.push(format!(
"Sampled {} out of {} posting entries, found {} pointer corruption issues",
validation_entries, posting_count, corrupt_pointers
));
} else {
notes.push(format!(
"Validated {} posting entries, found {} pointer corruption issues",
posting_count, corrupt_pointers
));
}
if corrupt_pointers == 0 {
Ok((true, "All posting storage pointers are valid".to_string()))
} else {
Ok((
false,
format!("Found {} corrupted pointers in posting storage", corrupt_pointers),
))
}
}
async fn validate_posting_storage_pointers_full(
&self,
mmf: &MemoryMappedFile,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
use crate::structures::PostingHeader;
let header_size = StandardHeader::SIZE as u64;
let posting_header_size = std::mem::size_of::<PostingHeader>() as u64;
notes.push("Performing comprehensive full-file posting validation".to_string());
let mut offset = header_size;
let mut posting_count = 0;
let mut corrupt_pointers = 0;
while offset + posting_header_size <= file_size {
let posting: PostingHeader = mmf.read_at(offset as usize)?;
posting_count += 1;
if posting_count % 50000 == 0 {
notes.push(format!("Full scan progress: {} entries validated", posting_count));
}
if posting.vector_offset >= file_size {
corrupt_pointers += 1;
}
let vector_size = posting.vector_len as u64 * 4; if posting.vector_offset + vector_size > file_size {
corrupt_pointers += 1;
}
if posting.vector_offset % 8 != 0 {
corrupt_pointers += 1;
}
offset += posting_header_size;
}
notes.push(format!(
"Full validation completed: {} entries, {} corrupted pointers found",
posting_count, corrupt_pointers
));
if corrupt_pointers == 0 {
Ok((true, "All posting storage pointers validated and valid".to_string()))
} else {
Ok((
false,
format!(
"Found {} corrupted pointers in posting storage (full scan)",
corrupt_pointers
),
))
}
}
async fn validate_wal_file_pointers(
&self,
mmf: &MemoryMappedFile,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
if file_size < StandardHeader::SIZE as u64 {
notes.push("WAL file too small for header".to_string());
return Ok((false, "WAL file header missing".to_string()));
}
let header: StandardHeader = mmf.read_at(0)?;
if header.magic != *crate::constants::magic::WAL {
notes.push("Invalid WAL magic bytes".to_string());
return Ok((false, "Not a valid WAL file".to_string()));
}
if header.data_offset >= file_size {
notes.push(format!(
"WAL data_offset ({}) exceeds file size ({})",
header.data_offset, file_size
));
return Ok((false, "WAL data offset out of bounds".to_string()));
}
notes.push("WAL file pointer validation completed".to_string());
Ok((true, "WAL file pointers are valid".to_string()))
}
async fn validate_generic_file_pointers(
&self,
mmf: &MemoryMappedFile,
file_size: u64,
notes: &mut Vec<String>,
) -> Result<(bool, String), ShardexError> {
use crate::memory::StandardHeader;
if file_size < StandardHeader::SIZE as u64 {
notes.push("File too small for standard header".to_string());
return Ok((false, "File header incomplete".to_string()));
}
let header: StandardHeader = mmf.read_at(0)?;
if header.data_offset >= file_size {
notes.push(format!(
"Data offset ({}) exceeds file size ({})",
header.data_offset, file_size
));
return Ok((false, "Data offset out of bounds".to_string()));
}
if header.data_offset % 8 != 0 {
notes.push(format!("Data offset ({}) not 8-byte aligned", header.data_offset));
return Ok((false, "Data offset misaligned".to_string()));
}
notes.push("Generic file pointer validation completed".to_string());
Ok((true, "Generic file pointers appear valid".to_string()))
}
async fn repair_vector_data_offset(
&self,
file_path: &Path,
new_offset: u64,
notes: &mut Vec<String>,
) -> Result<bool, ShardexError> {
use crate::memory::MemoryMappedFile;
use crate::vector_storage::VectorStorageHeader;
notes.push(format!(
"Attempting to repair vector_data_offset to {} in file: {}",
new_offset,
file_path.display()
));
let mut writable_mmf = match MemoryMappedFile::open_read_write(file_path) {
Ok(mmf) => mmf,
Err(e) => {
notes.push(format!("Failed to open file in read-write mode: {}", e));
return Ok(false);
}
};
let mut header: VectorStorageHeader = match writable_mmf.read_at(0) {
Ok(h) => h,
Err(e) => {
notes.push(format!("Failed to read vector storage header: {}", e));
return Ok(false);
}
};
let original_offset = header.vector_data_offset;
notes.push(format!("Original vector_data_offset: {}", original_offset));
if new_offset < std::mem::size_of::<VectorStorageHeader>() as u64 {
notes.push(format!(
"New offset {} is too small (must be >= header size {})",
new_offset,
std::mem::size_of::<VectorStorageHeader>()
));
return Ok(false);
}
if new_offset as usize >= writable_mmf.len() {
notes.push(format!(
"New offset {} exceeds file size {}",
new_offset,
writable_mmf.len()
));
return Ok(false);
}
if new_offset % 8 != 0 {
notes.push(format!("New offset {} is not 8-byte aligned", new_offset));
return Ok(false);
}
header.vector_data_offset = new_offset;
match writable_mmf.write_at(0, &header) {
Ok(()) => {
notes.push(format!(
"Successfully updated vector_data_offset from {} to {}",
original_offset, new_offset
));
match writable_mmf.read_at::<VectorStorageHeader>(0) {
Ok(verified_header) => {
if verified_header.vector_data_offset == new_offset {
notes.push("Repair verification successful - header updated correctly".to_string());
Ok(true)
} else {
notes.push(format!(
"Repair verification failed: expected {}, got {}",
new_offset, verified_header.vector_data_offset
));
Ok(false)
}
}
Err(e) => {
notes.push(format!("Failed to verify repair: {}", e));
Ok(false)
}
}
}
Err(e) => {
notes.push(format!("Failed to write updated header: {}", e));
Ok(false)
}
}
}
const VALID_PRODUCTION_MAGIC_BYTES: &'static [&'static [u8; 4]] = &[
crate::constants::magic::WAL,
crate::constants::magic::VECTOR_STORAGE,
crate::constants::magic::POSTING_STORAGE,
crate::constants::magic::TEXT_INDEX,
crate::constants::magic::TEXT_DATA,
];
fn is_valid_magic(&self, magic: &[u8; 4]) -> bool {
Self::VALID_PRODUCTION_MAGIC_BYTES.contains(&magic)
}
fn get_file_type_from_magic(&self, magic: &[u8; 4]) -> Option<&'static str> {
match *magic {
_ if magic == crate::constants::magic::WAL => Some("WAL (Write-Ahead Log)"),
_ if magic == crate::constants::magic::VECTOR_STORAGE => Some("Vector Storage"),
_ if magic == crate::constants::magic::POSTING_STORAGE => Some("Posting Storage"),
_ if magic == crate::constants::magic::TEXT_INDEX => Some("Text Index"),
_ if magic == crate::constants::magic::TEXT_DATA => Some("Text Data"),
_ => None,
}
}
async fn can_reconstruct_truncated_data(&self, corruption: &CorruptionReport) -> bool {
if let Some(size) = corruption.corruption_size {
size < 65536 && corruption.severity < 0.5
} else {
false
}
}
async fn can_rebuild_index_structure(&self, corruption: &CorruptionReport) -> bool {
let file_name = corruption
.file_path
.file_name()
.map(|n| n.to_string_lossy())
.unwrap_or_default();
file_name.contains("index")
|| file_name.contains("idx")
|| file_name.contains("bloom")
|| file_name.contains("posting")
}
}
fn component_type_name(component_type: ComponentType) -> &'static str {
match component_type {
ComponentType::VectorStorage => "VectorStorage",
ComponentType::PostingStorage => "PostingStorage",
ComponentType::WalSegments => "WalSegments",
ComponentType::BloomFilters => "BloomFilters",
ComponentType::ShardexSegments => "ShardexSegments",
ComponentType::CrossReferences => "CrossReferences",
}
}
impl ValidationResult {
pub fn success(validation_time: Duration, bytes_validated: u64, data_checksum: u32) -> Self {
Self {
is_valid: true,
corruption_report: None,
validation_time,
bytes_validated,
data_checksum,
}
}
pub fn failure(
corruption_report: CorruptionReport,
validation_time: Duration,
bytes_validated: u64,
data_checksum: u32,
) -> Self {
Self {
is_valid: false,
corruption_report: Some(corruption_report),
validation_time,
bytes_validated,
data_checksum,
}
}
pub fn is_valid(&self) -> bool {
self.is_valid
}
pub fn corruption_report(&self) -> Option<&CorruptionReport> {
self.corruption_report.as_ref()
}
pub fn bytes_validated(&self) -> u64 {
self.bytes_validated
}
pub fn data_checksum(&self) -> u32 {
self.data_checksum
}
}
impl IntegrityManager {
pub fn new(config: IntegrityConfig) -> Self {
Self {
config,
stats: IntegrityStats::default(),
validation_history: HashMap::new(),
monitored_files: HashMap::new(),
}
}
pub fn with_default_config() -> Self {
Self::new(IntegrityConfig::default())
}
pub fn validate_file(&mut self, mmf: &MemoryMappedFile) -> Result<ValidationResult, ShardexError> {
let start_time = Instant::now();
let file_data = mmf.as_slice();
if file_data.len() < FileHeader::SIZE {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"), corruption_offset: None,
corruption_size: Some(file_data.len() as u64),
description: format!(
"File too small: {} bytes, expected at least {} bytes for header",
file_data.len(),
FileHeader::SIZE
),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
if file_data.len() >= 4 {
let magic = &file_data[0..4];
match magic {
x if x == magic::POSTING_STORAGE => {
self.validate_posting_storage_file(mmf, start_time)
}
x if x == magic::VECTOR_STORAGE => {
self.validate_vector_storage_file(mmf, start_time)
}
_ => {
let header: FileHeader = mmf
.read_at(0)
.map_err(|e| ShardexError::Corruption(format!("Failed to read file header: {}", e)))?;
let data_start = FileHeader::SIZE;
let data_portion = &file_data[data_start..];
if let Err(e) = header.validate_checksum(data_portion) {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(data_start as u64),
corruption_size: Some(data_portion.len() as u64),
description: format!("Header checksum validation failed: {}", e),
recovery_recommendations: vec![
"Data may be corrupted. Check for partial writes.".to_string(),
"Restore from known good backup if available.".to_string(),
"Run detailed corruption analysis if recovery is needed.".to_string(),
],
severity: 0.8,
is_recoverable: self.config.enable_recovery,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
header.checksum,
));
}
let calculated_checksum = header.checksum;
self.stats.total_validations += 1;
self.stats.total_validation_time += start_time.elapsed();
self.stats.total_bytes_validated += file_data.len() as u64;
self.stats.last_validation = Some(SystemTime::now());
Ok(ValidationResult::success(
start_time.elapsed(),
file_data.len() as u64,
calculated_checksum,
))
}
}
} else {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"),
corruption_offset: None,
corruption_size: Some(file_data.len() as u64),
description: "File too small to contain magic bytes".to_string(),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
))
}
}
fn validate_posting_storage_file(
&mut self,
mmf: &MemoryMappedFile,
start_time: Instant,
) -> Result<ValidationResult, ShardexError> {
use crate::posting_storage::PostingStorageHeader;
let file_data = mmf.as_slice();
if file_data.len() < PostingStorageHeader::SIZE {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"),
corruption_offset: None,
corruption_size: Some(file_data.len() as u64),
description: format!(
"File too small: {} bytes, expected at least {} bytes for PostingStorage header",
file_data.len(),
PostingStorageHeader::SIZE
),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let header: PostingStorageHeader = mmf
.read_at(0)
.map_err(|e| ShardexError::Corruption(format!("Failed to read PostingStorage header: {}", e)))?;
if let Err(e) = header.validate() {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(0),
corruption_size: Some(PostingStorageHeader::SIZE as u64),
description: format!("PostingStorage header validation failed: {}", e),
recovery_recommendations: vec![
"Header may be corrupted. Check file system integrity.".to_string(),
"Restore from known good backup if available.".to_string(),
],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let data_start = PostingStorageHeader::SIZE;
let data_size = header.calculate_file_size() - PostingStorageHeader::SIZE;
if data_start + data_size > file_data.len() {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(data_start as u64),
corruption_size: Some(data_size as u64),
description: "PostingStorage file truncated before end of data section".to_string(),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let data_portion = &file_data[data_start..data_start + data_size];
if let Err(e) = header.file_header.validate_checksum(data_portion) {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(data_start as u64),
corruption_size: Some(data_size as u64),
description: format!("PostingStorage data checksum validation failed: {}", e),
recovery_recommendations: vec![
"Data may be corrupted. Check for partial writes.".to_string(),
"Restore from known good backup if available.".to_string(),
],
severity: 0.8,
is_recoverable: self.config.enable_recovery,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
header.file_header.checksum,
));
}
self.stats.total_validations += 1;
self.stats.total_validation_time += start_time.elapsed();
self.stats.total_bytes_validated += file_data.len() as u64;
self.stats.last_validation = Some(SystemTime::now());
Ok(ValidationResult::success(
start_time.elapsed(),
file_data.len() as u64,
header.file_header.checksum,
))
}
fn validate_vector_storage_file(
&mut self,
mmf: &MemoryMappedFile,
start_time: Instant,
) -> Result<ValidationResult, ShardexError> {
use crate::vector_storage::VectorStorageHeader;
let file_data = mmf.as_slice();
if file_data.len() < VectorStorageHeader::SIZE {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"),
corruption_offset: None,
corruption_size: Some(file_data.len() as u64),
description: format!(
"File too small: {} bytes, expected at least {} bytes for VectorStorage header",
file_data.len(),
VectorStorageHeader::SIZE
),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let header: VectorStorageHeader = mmf
.read_at(0)
.map_err(|e| ShardexError::Corruption(format!("Failed to read VectorStorage header: {}", e)))?;
if let Err(e) = header.validate() {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(0),
corruption_size: Some(VectorStorageHeader::SIZE as u64),
description: format!("VectorStorage header validation failed: {}", e),
recovery_recommendations: vec![
"Header may be corrupted. Check file system integrity.".to_string(),
"Restore from known good backup if available.".to_string(),
],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let vector_data_start = header.vector_data_offset as usize;
let vector_data_size =
(header.capacity as usize) * (header.vector_dimension as usize) * std::mem::size_of::<f32>();
let aligned_size = Self::align_size(vector_data_size, header.simd_alignment as usize);
if vector_data_start + aligned_size > file_data.len() {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(vector_data_start as u64),
corruption_size: Some(aligned_size as u64),
description: "VectorStorage file truncated before end of vector data".to_string(),
recovery_recommendations: vec!["File may be truncated. Restore from backup.".to_string()],
severity: 1.0,
is_recoverable: false,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
0,
));
}
let vector_data = &file_data[vector_data_start..vector_data_start + aligned_size];
if let Err(e) = header.file_header.validate_checksum(vector_data) {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("unknown"),
corruption_offset: Some(vector_data_start as u64),
corruption_size: Some(aligned_size as u64),
description: format!("VectorStorage data checksum validation failed: {}", e),
recovery_recommendations: vec![
"Vector data may be corrupted. Check for partial writes.".to_string(),
"Restore from known good backup if available.".to_string(),
],
severity: 0.8,
is_recoverable: self.config.enable_recovery,
detected_at: SystemTime::now(),
};
return Ok(ValidationResult::failure(
corruption_report,
start_time.elapsed(),
file_data.len() as u64,
header.file_header.checksum,
));
}
self.stats.total_validations += 1;
self.stats.total_validation_time += start_time.elapsed();
self.stats.total_bytes_validated += file_data.len() as u64;
self.stats.last_validation = Some(SystemTime::now());
Ok(ValidationResult::success(
start_time.elapsed(),
file_data.len() as u64,
header.file_header.checksum,
))
}
fn align_size(size: usize, alignment: usize) -> usize {
(size + alignment - 1) & !(alignment - 1)
}
pub fn validate_file_path<P: AsRef<Path>>(&mut self, path: P) -> Result<ValidationResult, ShardexError> {
let path = path.as_ref();
let mmf = MemoryMappedFile::open_read_only(path)?;
let mut result = self.validate_file(&mmf)?;
if let Some(ref mut report) = result.corruption_report {
report.file_path = path.to_path_buf();
}
self.validation_history
.insert(path.to_path_buf(), SystemTime::now());
Ok(result)
}
pub fn add_to_monitoring<P: AsRef<Path>>(&mut self, path: P) -> Result<(), ShardexError> {
let path = path.as_ref().to_path_buf();
let result = self.validate_file_path(&path)?;
let monitoring_state = FileMonitoringState {
last_known_checksum: result.data_checksum(),
last_validation: SystemTime::now(),
consecutive_failures: if result.is_valid() { 0 } else { 1 },
is_healthy: result.is_valid(),
};
self.monitored_files.insert(path, monitoring_state);
Ok(())
}
pub fn remove_from_monitoring<P: AsRef<Path>>(&mut self, path: P) {
let path = path.as_ref().to_path_buf();
self.monitored_files.remove(&path);
self.validation_history.remove(&path);
}
pub fn perform_periodic_validation(&mut self) -> Result<Vec<ValidationResult>, ShardexError> {
let mut results = Vec::new();
if self.config.periodic_validation_interval.is_none() {
return Ok(results); }
let now = SystemTime::now();
let interval = self.config.periodic_validation_interval.unwrap();
let files_to_validate: Vec<PathBuf> = self
.monitored_files
.iter()
.filter(|(_, state)| {
now.duration_since(state.last_validation)
.unwrap_or(Duration::MAX)
>= interval
})
.map(|(path, _)| path.clone())
.collect();
for file_path in files_to_validate {
let result = self.validate_file_path(&file_path)?;
if let Some(state) = self.monitored_files.get_mut(&file_path) {
state.last_validation = now;
if result.is_valid() {
state.consecutive_failures = 0;
state.is_healthy = true;
state.last_known_checksum = result.data_checksum();
} else {
state.consecutive_failures += 1;
state.is_healthy = false;
self.stats.corruptions_detected += 1;
}
}
results.push(result);
}
Ok(results)
}
pub fn needs_validation<P: AsRef<Path>>(&self, path: P) -> bool {
let path = path.as_ref();
let Some(interval) = self.config.periodic_validation_interval else {
return false; };
let Some(state) = self.monitored_files.get(path) else {
return true; };
SystemTime::now()
.duration_since(state.last_validation)
.unwrap_or(Duration::MAX)
>= interval
}
pub fn stats(&self) -> &IntegrityStats {
&self.stats
}
pub fn file_health_status<P: AsRef<Path>>(&self, path: P) -> Option<bool> {
self.monitored_files
.get(path.as_ref())
.map(|state| state.is_healthy)
}
pub fn monitored_files(&self) -> Vec<&PathBuf> {
self.monitored_files.keys().collect()
}
pub fn attempt_recovery(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if !self.config.enable_recovery || !corruption_report.is_recoverable {
return Ok(false);
}
match corruption_report.corruption_type {
CorruptionType::DataCorruption => {
tracing::info!(
"Attempting data corruption recovery for file: {:?}",
corruption_report.file_path
);
self.recover_data_corruption(corruption_report)
}
CorruptionType::HeaderCorruption => {
tracing::warn!(
"Header corruption detected in file: {:?} - attempting recovery",
corruption_report.file_path
);
self.recover_header_corruption(corruption_report)
}
CorruptionType::FileTruncation => {
tracing::info!(
"Attempting file truncation recovery for file: {:?}",
corruption_report.file_path
);
self.recover_file_truncation(corruption_report)
}
CorruptionType::StructuralInconsistency => {
tracing::info!(
"Attempting structural inconsistency recovery for file: {:?}",
corruption_report.file_path
);
self.recover_structural_inconsistency(corruption_report)
}
CorruptionType::CrossValidationFailure => {
tracing::info!(
"Attempting cross-validation failure recovery for file: {:?}",
corruption_report.file_path
);
self.recover_cross_validation_failure(corruption_report)
}
CorruptionType::PartialCorruption => {
tracing::info!(
"Attempting partial corruption recovery for file: {:?}",
corruption_report.file_path
);
self.recover_partial_corruption(corruption_report)
}
}
}
fn recover_data_corruption(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if let Some(offset) = corruption_report.corruption_offset {
if offset > 0 && corruption_report.severity < 0.8 {
tracing::info!("Data corruption at recoverable offset {}", offset);
if let Some(size) = corruption_report.corruption_size {
if size < 4096 {
tracing::info!("Small corruption region ({}b), recovery possible", size);
return Ok(true); }
}
}
}
tracing::warn!("Data corruption too severe for automatic recovery");
Ok(false)
}
fn recover_header_corruption(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if let Some(file_name) = corruption_report.file_path.file_name() {
let file_name = file_name.to_string_lossy();
if file_name.contains("vector") || file_name.contains(".vec") {
tracing::info!("Detected vector storage file, may be able to rebuild basic header");
return Ok(false); } else if file_name.contains("posting") || file_name.contains(".post") {
tracing::info!("Detected posting storage file, may be able to rebuild basic header");
return Ok(false); }
}
tracing::error!("Header corruption recovery not possible without backup");
Ok(false)
}
fn recover_file_truncation(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if let Some(offset) = corruption_report.corruption_offset {
if offset % 4096 == 0 {
tracing::info!("Truncation at page boundary, attempting recovery");
if corruption_report.severity < 0.5 {
tracing::info!("Minor truncation detected, recovery may be possible");
return Ok(true); }
}
}
tracing::warn!("File truncation too severe for automatic recovery");
Ok(false)
}
fn recover_structural_inconsistency(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if corruption_report.description.contains("index") {
tracing::info!("Index structure corruption detected, attempting rebuild");
let file_name = corruption_report
.file_path
.file_name()
.map(|n| n.to_string_lossy())
.unwrap_or_default();
if file_name.contains("index")
|| file_name.contains("idx")
|| file_name.contains("bloom")
|| file_name.contains("posting")
{
tracing::info!("Rebuildable index component detected");
return Ok(true); }
}
tracing::warn!("Structural inconsistency cannot be automatically recovered");
Ok(false)
}
fn recover_cross_validation_failure(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if corruption_report.description.contains("mismatch") {
tracing::info!("Cross-validation mismatch detected, attempting resolution");
if corruption_report.description.contains("count") || corruption_report.description.contains("capacity") {
tracing::info!("Count/capacity mismatch - may be recoverable by rebuilding indices");
return Ok(true);
}
if corruption_report.description.contains("dimension") {
tracing::warn!("Dimension mismatch requires configuration changes");
return Ok(false);
}
}
tracing::info!("Cross-validation failure may be recoverable");
Ok(true) }
fn recover_partial_corruption(&mut self, corruption_report: &CorruptionReport) -> Result<bool, ShardexError> {
if let (Some(offset), Some(size)) = (corruption_report.corruption_offset, corruption_report.corruption_size) {
tracing::info!("Partial corruption at offset {} size {} bytes", offset, size);
if size < 64 * 1024 && corruption_report.severity < 0.6 {
tracing::info!("Small partial corruption, recovery likely possible");
return Ok(true);
}
if offset > 1024 && corruption_report.severity < 0.8 {
tracing::info!("Partial corruption in non-critical region, recovery possible");
return Ok(true);
}
}
tracing::warn!("Partial corruption too extensive for automatic recovery");
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ShardexConfig;
use crate::identifiers::DocumentId;
use crate::memory::MemoryMappedFile;
use std::fs;
use tempfile::{NamedTempFile, TempDir};
#[test]
fn test_integrity_config_default() {
let config = IntegrityConfig::default();
assert!(config.periodic_validation_interval.is_some());
assert_eq!(config.corruption_tolerance, 0.0);
assert!(config.enable_recovery);
assert_eq!(config.max_recovery_attempts, 3);
}
#[test]
fn test_validation_result_success() {
let result = ValidationResult::success(Duration::from_millis(100), 1024, 0x12345678);
assert!(result.is_valid());
assert!(result.corruption_report().is_none());
assert_eq!(result.bytes_validated(), 1024);
assert_eq!(result.data_checksum(), 0x12345678);
}
#[test]
fn test_validation_result_failure() {
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Test corruption".to_string(),
recovery_recommendations: vec!["Test recovery".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = ValidationResult::failure(corruption_report, Duration::from_millis(200), 1024, 0x87654321);
assert!(!result.is_valid());
assert!(result.corruption_report().is_some());
let report = result.corruption_report().unwrap();
assert_eq!(report.corruption_type, CorruptionType::DataCorruption);
assert_eq!(report.severity, 0.5);
assert!(report.is_recoverable);
}
#[test]
fn test_integrity_manager_creation() {
let config = IntegrityConfig::default();
let manager = IntegrityManager::new(config);
assert_eq!(manager.stats().total_validations, 0);
assert_eq!(manager.monitored_files().len(), 0);
}
#[test]
fn test_file_validation_success() {
use crate::identifiers::DocumentId;
use crate::posting_storage::PostingStorage;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("postings.dat");
let mut storage = PostingStorage::create(&storage_path, 10).unwrap();
let doc_id = DocumentId::new();
storage.add_posting(doc_id, 100, 50).unwrap();
storage.sync().unwrap();
let file_data = storage.memory_mapped_file().as_slice();
let magic = &file_data[0..4];
println!("Debug: Magic bytes in file: {:?}", magic);
println!(
"Debug: Magic bytes as string: {:?}",
std::str::from_utf8(magic).unwrap_or("invalid")
);
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file(storage.memory_mapped_file()).unwrap();
if !result.is_valid() {
if let Some(report) = result.corruption_report() {
println!("Debug: Corruption report: {:?}", report);
}
}
assert!(result.is_valid());
assert_eq!(manager.stats().total_validations, 1);
assert!(manager.stats().total_bytes_validated > 0);
}
#[test]
fn test_file_validation_truncation() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("truncated.dat");
let mmf = MemoryMappedFile::create(&file_path, 8).unwrap();
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file(&mmf).unwrap();
assert!(!result.is_valid());
let report = result.corruption_report().unwrap();
assert_eq!(report.corruption_type, CorruptionType::FileTruncation);
assert_eq!(report.severity, 1.0);
assert!(!report.is_recoverable);
}
#[test]
fn test_file_validation_checksum_mismatch() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("corrupted.dat");
let mut mmf = MemoryMappedFile::create(&file_path, 1024).unwrap();
let original_data = vec![42u8; 100];
let corrupted_data = vec![99u8; 100];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &original_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &corrupted_data)
.unwrap();
mmf.sync().unwrap();
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file(&mmf).unwrap();
assert!(!result.is_valid());
let report = result.corruption_report().unwrap();
assert_eq!(report.corruption_type, CorruptionType::DataCorruption);
assert_eq!(report.severity, 0.8);
}
#[test]
fn test_file_path_validation() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut mmf = MemoryMappedFile::create(file_path, 1024).unwrap();
let data_size = 1024 - FileHeader::SIZE; let test_data = vec![42u8; data_size];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &test_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap();
mmf.sync().unwrap();
drop(mmf);
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file_path(file_path).unwrap();
assert!(result.is_valid());
assert!(manager
.validation_history
.contains_key(&file_path.to_path_buf()));
}
#[test]
fn test_monitoring_add_remove() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut mmf = MemoryMappedFile::create(file_path, 1024).unwrap();
let data_size = 1024 - FileHeader::SIZE; let test_data = vec![42u8; data_size];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &test_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap();
mmf.sync().unwrap();
drop(mmf);
let mut manager = IntegrityManager::with_default_config();
manager.add_to_monitoring(file_path).unwrap();
assert_eq!(manager.monitored_files().len(), 1);
assert!(manager.file_health_status(file_path).unwrap());
manager.remove_from_monitoring(file_path);
assert_eq!(manager.monitored_files().len(), 0);
assert!(manager.file_health_status(file_path).is_none());
}
#[test]
fn test_needs_validation() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut mmf = MemoryMappedFile::create(file_path, 1024).unwrap();
let test_data = vec![42u8; 100];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &test_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap();
mmf.sync().unwrap();
drop(mmf);
let config = IntegrityConfig {
periodic_validation_interval: Some(Duration::from_millis(1)),
..Default::default()
};
let mut manager = IntegrityManager::new(config);
assert!(manager.needs_validation(file_path));
manager.add_to_monitoring(file_path).unwrap();
assert!(!manager.needs_validation(file_path));
std::thread::sleep(Duration::from_millis(10));
assert!(manager.needs_validation(file_path));
}
#[test]
fn test_corruption_report_serialization() {
let report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file.dat"),
corruption_offset: Some(1024),
corruption_size: Some(256),
description: "Test corruption for serialization".to_string(),
recovery_recommendations: vec!["Restore from backup".to_string(), "Run data recovery tools".to_string()],
severity: 0.75,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let serialized = serde_json::to_string(&report).unwrap();
let deserialized: CorruptionReport = serde_json::from_str(&serialized).unwrap();
assert_eq!(report.corruption_type, deserialized.corruption_type);
assert_eq!(report.file_path, deserialized.file_path);
assert_eq!(report.corruption_offset, deserialized.corruption_offset);
assert_eq!(report.severity, deserialized.severity);
assert_eq!(report.is_recoverable, deserialized.is_recoverable);
}
#[test]
fn test_storage_integrity_validation() {
use crate::identifiers::DocumentId;
use crate::posting_storage::PostingStorage;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("postings.dat");
let mut storage = PostingStorage::create(&storage_path, 10).unwrap();
let doc_id = DocumentId::new();
storage.add_posting(doc_id, 100, 50).unwrap();
storage.sync().unwrap();
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file(storage.memory_mapped_file()).unwrap();
assert!(result.is_valid());
assert!(storage.validate_integrity().is_ok());
}
#[test]
fn test_vector_storage_integrity_validation() {
use crate::vector_storage::VectorStorage;
let temp_dir = TempDir::new().unwrap();
let storage_path = temp_dir.path().join("vectors.dat");
let mut storage = VectorStorage::create(&storage_path, 3, 5).unwrap();
let vector = vec![1.0, 2.0, 3.0];
storage.add_vector(&vector).unwrap();
storage.sync().unwrap();
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file(storage.memory_mapped_file()).unwrap();
assert!(result.is_valid());
assert!(storage.validate_integrity().is_ok());
}
#[test]
fn test_periodic_validation_workflow() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("periodic_test.dat");
let mut mmf = MemoryMappedFile::create(&file_path, 1024).unwrap();
let data_size = 1024 - FileHeader::SIZE; let test_data = vec![42u8; data_size];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &test_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap();
mmf.sync().unwrap();
drop(mmf);
let config = IntegrityConfig {
periodic_validation_interval: Some(Duration::from_millis(10)),
..Default::default()
};
let mut manager = IntegrityManager::new(config);
manager.add_to_monitoring(&file_path).unwrap();
assert!(manager.file_health_status(&file_path).unwrap());
assert!(!manager.needs_validation(&file_path));
std::thread::sleep(Duration::from_millis(20));
assert!(manager.needs_validation(&file_path));
let results = manager.perform_periodic_validation().unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].is_valid());
assert!(!manager.needs_validation(&file_path));
}
#[test]
fn test_corruption_detection_and_recovery() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("corrupt_test.dat");
let mut mmf = MemoryMappedFile::create(&file_path, 1024).unwrap();
let original_data = vec![42u8; 100];
let corrupted_data = vec![99u8; 100];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &original_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &corrupted_data)
.unwrap();
mmf.sync().unwrap();
drop(mmf);
let mut manager = IntegrityManager::with_default_config();
let result = manager.validate_file_path(&file_path).unwrap();
assert!(!result.is_valid());
let report = result.corruption_report().unwrap();
assert_eq!(report.corruption_type, CorruptionType::DataCorruption);
let recovery_result = manager.attempt_recovery(report).unwrap();
assert!(!recovery_result); }
#[test]
fn test_integrity_statistics() {
let temp_dir = TempDir::new().unwrap();
let file1 = temp_dir.path().join("file1.dat");
let file2 = temp_dir.path().join("file2.dat");
for file_path in [&file1, &file2] {
let mut mmf = MemoryMappedFile::create(file_path, 1024).unwrap();
let data_size = 1024 - FileHeader::SIZE; let test_data = vec![42u8; data_size];
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &test_data);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap();
mmf.sync().unwrap();
}
let mut manager = IntegrityManager::with_default_config();
manager.validate_file_path(&file1).unwrap();
manager.validate_file_path(&file2).unwrap();
let stats = manager.stats();
assert_eq!(stats.total_validations, 2);
assert_eq!(stats.corruptions_detected, 0);
assert!(stats.total_bytes_validated > 0);
assert!(stats.last_validation.is_some());
}
#[test]
fn test_cross_validation_capability() {
let temp_dir = TempDir::new().unwrap();
let posting_path = temp_dir.path().join("postings.dat");
let vector_path = temp_dir.path().join("vectors.dat");
let mut posting_storage = crate::posting_storage::PostingStorage::create(&posting_path, 10).unwrap();
let doc_id = crate::identifiers::DocumentId::new();
posting_storage.add_posting(doc_id, 0, 3).unwrap(); posting_storage.sync().unwrap();
let mut vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 3, 10).unwrap();
let vector = vec![1.0, 2.0, 3.0];
vector_storage.add_vector(&vector).unwrap();
vector_storage.sync().unwrap();
let mut manager = IntegrityManager::with_default_config();
let posting_result = manager
.validate_file(posting_storage.memory_mapped_file())
.unwrap();
let vector_result = manager
.validate_file(vector_storage.memory_mapped_file())
.unwrap();
assert!(posting_result.is_valid());
assert!(vector_result.is_valid());
}
#[tokio::test]
async fn test_integrity_checker_creation() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config.clone());
assert!(checker.repair_enabled);
let checker_ro = IntegrityChecker::new_read_only(temp_dir.path().to_path_buf(), config);
assert!(!checker_ro.repair_enabled);
checker.discover_components().unwrap();
assert!(checker
.component_paths
.contains_key(&ComponentType::VectorStorage));
assert!(checker
.component_paths
.contains_key(&ComponentType::PostingStorage));
}
#[tokio::test]
async fn test_integrity_report_creation() {
let mut report = IntegrityReport::new();
assert!(report.is_valid);
assert!(report.component_results.is_empty());
assert!(report.cross_reference_issues.is_empty());
let success_result = ValidationResult::success(Duration::from_millis(100), 1024, 0x12345678);
report.add_component_result(ComponentType::VectorStorage, success_result);
assert!(report.is_valid);
assert_eq!(report.component_results.len(), 1);
let corruption = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Test corruption".to_string(),
recovery_recommendations: vec!["Test recovery".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let failure_result =
ValidationResult::failure(corruption.clone(), Duration::from_millis(200), 2048, 0x87654321);
report.add_component_result(ComponentType::PostingStorage, failure_result);
assert!(!report.is_valid);
report.add_cross_reference_issue(corruption);
assert_eq!(report.cross_reference_issues.len(), 1);
report.generate_summary();
assert!(!report.summary.is_empty());
assert!(report.summary.contains("cross-reference"));
}
#[tokio::test]
async fn test_repair_report_functionality() {
let mut repair_report = RepairReport::new();
assert_eq!(repair_report.success_rate(), 0.0);
assert!(repair_report.all_critical_resolved);
let issue = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file1"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Test corruption 1".to_string(),
recovery_recommendations: vec!["Test recovery 1".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let repair_result = RepairResult {
issue: issue.clone(),
success: true,
action_taken: "Fixed the issue".to_string(),
repair_time: Duration::from_millis(500),
notes: vec!["Repair was successful".to_string()],
};
repair_report.add_repair_result(repair_result);
assert_eq!(repair_report.success_rate(), 1.0);
assert_eq!(repair_report.issues_repaired, 1);
let critical_issue = CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("/test/file2"),
corruption_offset: Some(0),
corruption_size: Some(100),
description: "Critical corruption".to_string(),
recovery_recommendations: vec!["Manual intervention required".to_string()],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
};
let failed_repair_result = RepairResult {
issue: critical_issue,
success: false,
action_taken: "Could not repair".to_string(),
repair_time: Duration::from_millis(100),
notes: vec!["Manual intervention required".to_string()],
};
repair_report.add_repair_result(failed_repair_result);
assert_eq!(repair_report.success_rate(), 0.5);
assert!(!repair_report.all_critical_resolved);
}
#[tokio::test]
async fn test_integrity_issue_categorization() {
let header_corruption = CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: PathBuf::from("/test/file"),
corruption_offset: Some(0),
corruption_size: Some(32),
description: "Header magic bytes corrupted".to_string(),
recovery_recommendations: vec!["Restore from backup".to_string()],
severity: 0.9,
is_recoverable: false,
detected_at: SystemTime::now(),
};
let issue = IntegrityIssue::from_corruption(header_corruption);
assert_eq!(issue.priority, 1);
assert!(issue.blocking);
assert_eq!(issue.repair_difficulty, 5);
assert!(!issue.auto_repairable);
let data_corruption = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file"),
corruption_offset: Some(1024),
corruption_size: Some(256),
description: "Data checksum mismatch".to_string(),
recovery_recommendations: vec!["Recalculate checksum".to_string()],
severity: 0.6,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let issue2 = IntegrityIssue::from_corruption(data_corruption);
assert_eq!(issue2.priority, 2);
assert!(!issue2.blocking);
assert_eq!(issue2.repair_difficulty, 2);
assert!(issue2.auto_repairable);
}
#[tokio::test]
async fn test_component_type_validation() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
fs::create_dir_all(temp_dir.path().join("shards")).unwrap();
fs::create_dir_all(temp_dir.path().join("wal")).unwrap();
fs::create_dir_all(temp_dir.path().join("centroids")).unwrap();
let _vector_file = NamedTempFile::new_in(temp_dir.path().join("shards")).unwrap();
let vector_path = temp_dir.path().join("shards/test_shard.vectors");
let _mmf = MemoryMappedFile::create(&vector_path, 1024).unwrap();
let _posting_file = NamedTempFile::new_in(temp_dir.path().join("shards")).unwrap();
let posting_path = temp_dir.path().join("shards/test_shard.postings");
let _mmf2 = MemoryMappedFile::create(&posting_path, 1024).unwrap();
checker.discover_components().unwrap();
let vector_files = checker
.component_paths
.get(&ComponentType::VectorStorage)
.unwrap();
assert!(!vector_files.is_empty());
let posting_files = checker
.component_paths
.get(&ComponentType::PostingStorage)
.unwrap();
assert!(!posting_files.is_empty());
}
#[tokio::test]
async fn test_full_integrity_verification() {
let temp_dir = TempDir::new().unwrap();
fs::create_dir_all(temp_dir.path().join("shards")).unwrap();
let vector_path = temp_dir.path().join("shards/test_shard.vectors");
let mut vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 10).unwrap();
let vector = vec![0.5; 128];
vector_storage.add_vector(&vector).unwrap();
vector_storage.sync().unwrap();
let posting_path = temp_dir.path().join("shards/test_shard.postings");
let mut posting_storage = crate::posting_storage::PostingStorage::create(&posting_path, 10).unwrap();
let doc_id = DocumentId::new();
posting_storage.add_posting(doc_id, 100, 50).unwrap();
posting_storage.sync().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let report = checker.verify_full_integrity().await.unwrap();
assert!(report.is_valid);
assert!(!report.component_results.is_empty());
assert!(report.cross_reference_issues.is_empty());
assert!(report.total_bytes_validated > 0);
assert!(!report.summary.is_empty());
}
#[tokio::test]
async fn test_incremental_integrity_verification() {
let temp_dir = TempDir::new().unwrap();
fs::create_dir_all(temp_dir.path().join("shards")).unwrap();
let vector_path = temp_dir.path().join("shards/test_shard.vectors");
let _vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 10).unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let components = vec![ComponentType::VectorStorage];
let report = checker.verify_incremental(&components).await.unwrap();
assert!(report
.component_results
.contains_key(&ComponentType::VectorStorage));
assert!(!report
.component_results
.contains_key(&ComponentType::PostingStorage));
}
#[tokio::test]
async fn test_cross_reference_validation() {
let temp_dir = TempDir::new().unwrap();
fs::create_dir_all(temp_dir.path().join("shards")).unwrap();
let shard_id = "test_shard_123";
let vector_path = temp_dir.path().join(format!("shards/{}.vectors", shard_id));
let posting_path = temp_dir
.path()
.join(format!("shards/{}.postings", shard_id));
let mut vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 10).unwrap();
let vector = vec![0.5; 128];
vector_storage.add_vector(&vector).unwrap();
vector_storage.sync().unwrap();
let mut posting_storage = crate::posting_storage::PostingStorage::create(&posting_path, 10).unwrap();
let doc_id = DocumentId::new();
posting_storage.add_posting(doc_id, 100, 50).unwrap();
posting_storage.sync().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
checker.discover_components().unwrap();
let posting_files = checker
.component_paths
.get(&ComponentType::PostingStorage)
.unwrap()
.clone();
let vector_files = checker
.component_paths
.get(&ComponentType::VectorStorage)
.unwrap()
.clone();
let issues = checker
.verify_posting_vector_consistency(&posting_files, &vector_files)
.await
.unwrap();
assert!(issues.is_empty()); }
#[tokio::test]
async fn test_cross_reference_validation_missing_files() {
let temp_dir = TempDir::new().unwrap();
fs::create_dir_all(temp_dir.path().join("shards")).unwrap();
let vector_path = temp_dir.path().join("shards/orphan_shard.vectors");
let _vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 10).unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let posting_files = vec![];
let vector_files = vec![vector_path];
let issues = checker
.verify_posting_vector_consistency(&posting_files, &vector_files)
.await
.unwrap();
assert!(!issues.is_empty());
assert_eq!(issues[0].corruption_type, CorruptionType::CrossValidationFailure);
assert!(issues[0]
.description
.contains("no corresponding posting storage"));
}
#[tokio::test]
async fn test_repair_functionality() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config.clone());
let mut checker_ro = IntegrityChecker::new_read_only(temp_dir.path().to_path_buf(), config);
checker_ro.set_repair_enabled(false);
let corruption = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: temp_dir.path().join("test_file"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Test corruption for repair".to_string(),
recovery_recommendations: vec!["Test recovery".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let issue = IntegrityIssue::from_corruption(corruption);
let issues = vec![issue];
let repair_result = checker_ro.attempt_repair(&issues).await;
assert!(repair_result.is_err());
let repair_report = checker.attempt_repair(&issues).await.unwrap();
assert_eq!(repair_report.issues_attempted, 1);
}
#[tokio::test]
async fn test_vector_quality_validation() {
let temp_dir = TempDir::new().unwrap();
let vector_path = temp_dir.path().join("test_vectors.dat");
let mut vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 4, 10).unwrap();
let normal_vector = vec![1.0, 2.0, 3.0, 4.0];
vector_storage.add_vector(&normal_vector).unwrap();
let zero_vector = vec![0.0, 0.0, 0.0, 0.0];
vector_storage.add_vector(&zero_vector).unwrap();
vector_storage.sync().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(4);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let result = checker
.verify_vector_storage_file(&vector_path)
.await
.unwrap();
assert!(result.is_valid());
}
#[test]
fn test_component_type_name() {
assert_eq!(component_type_name(ComponentType::VectorStorage), "VectorStorage");
assert_eq!(component_type_name(ComponentType::PostingStorage), "PostingStorage");
assert_eq!(component_type_name(ComponentType::WalSegments), "WalSegments");
assert_eq!(component_type_name(ComponentType::BloomFilters), "BloomFilters");
assert_eq!(component_type_name(ComponentType::ShardexSegments), "ShardexSegments");
assert_eq!(component_type_name(ComponentType::CrossReferences), "CrossReferences");
}
#[tokio::test]
async fn test_repair_strategies() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let nan_corruption = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: temp_dir.path().join("test_file"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Excessive NaN values in vectors: 15.0% (3/20)".to_string(),
recovery_recommendations: vec!["Check vector computation logic".to_string()],
severity: 0.8,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let (success, action, notes) = checker
.repair_vector_quality_issues(&nan_corruption)
.await
.unwrap();
assert!(!success); assert!(action.contains("NaN repair"));
assert!(!notes.is_empty());
let count_mismatch = CorruptionReport {
corruption_type: CorruptionType::CrossValidationFailure,
file_path: temp_dir.path().join("test_file"),
corruption_offset: None,
corruption_size: None,
description: "Count mismatch: posting storage has 5, vector storage has 3".to_string(),
recovery_recommendations: vec!["Run consistency repair".to_string()],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let (success, action, notes) = checker
.repair_count_mismatch(&count_mismatch)
.await
.unwrap();
assert!(success); assert!(action.contains("can be automatically repaired"));
assert!(!notes.is_empty());
}
#[tokio::test]
async fn test_capacity_consistency_validation() {
let temp_dir = TempDir::new().unwrap();
let posting_path = temp_dir.path().join("test_shard.postings");
let _posting_storage = crate::posting_storage::PostingStorage::create(&posting_path, 10).unwrap();
let vector_path = temp_dir.path().join("test_shard.vectors");
let _vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 5).unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let posting_mmf = MemoryMappedFile::open_read_only(&posting_path).unwrap();
let vector_mmf = MemoryMappedFile::open_read_only(&vector_path).unwrap();
let result = checker
.verify_header_compatibility(&posting_mmf, &vector_mmf)
.await;
assert!(result.is_err());
let error = result.unwrap_err();
assert_eq!(error.corruption_type, CorruptionType::CrossValidationFailure);
assert!(error.description.contains("Capacity mismatch"));
}
#[test]
fn test_detailed_corruption_analysis() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("detailed_analysis.dat");
let mut mmf = MemoryMappedFile::create(&file_path, 1024).unwrap();
let data_size = 1024 - FileHeader::SIZE; let mut test_data = vec![42u8; data_size];
test_data[50] = 0xFF;
test_data[51] = 0xFF;
let header = FileHeader::new(magic::TEST_GENERIC, 1, FileHeader::SIZE as u64, &vec![42u8; data_size]);
mmf.write_at(0, &header).unwrap();
mmf.write_slice_at(FileHeader::SIZE, &test_data).unwrap(); mmf.sync().unwrap();
drop(mmf);
let config = IntegrityConfig {
detailed_analysis: true,
..Default::default()
};
let mut manager = IntegrityManager::new(config);
let result = manager.validate_file_path(&file_path).unwrap();
assert!(!result.is_valid());
let report = result.corruption_report().unwrap();
assert!(report.description.contains("Checksum mismatch")); assert!(!report.recovery_recommendations.is_empty());
assert_eq!(report.corruption_type, CorruptionType::DataCorruption);
assert!(report.severity > 0.0);
}
#[test]
fn test_verify_posting_data_quality_valid_data() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header_size = std::mem::size_of::<crate::posting_storage::PostingStorageHeader>();
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 128),
capacity: 2,
current_count: 2,
active_count: 2,
document_ids_offset: header_size as u64, starts_offset: (header_size + 2 * 16) as u64, lengths_offset: (header_size + 2 * 16 + 2 * 4) as u64, deleted_flags_offset: (header_size + 2 * 16 + 2 * 4 + 2 * 4) as u64, document_id_size: 16,
reserved: [0; 12],
};
let mut posting_data = vec![0u8; 2 * 16 + 2 * 4 + 2 * 4 + 1];
use crate::identifiers::DocumentId;
let doc_id1 = DocumentId::from_raw(1000000000000000000u128);
let doc_id2 = DocumentId::from_raw(2000000000000000000u128);
posting_data[0..16].copy_from_slice(&doc_id1.to_bytes());
posting_data[16..32].copy_from_slice(&doc_id2.to_bytes());
let starts_offset = 2 * 16;
posting_data[starts_offset..starts_offset + 4].copy_from_slice(&100u32.to_le_bytes());
posting_data[starts_offset + 4..starts_offset + 8].copy_from_slice(&200u32.to_le_bytes());
let lengths_offset = 2 * 16 + 2 * 4;
posting_data[lengths_offset..lengths_offset + 4].copy_from_slice(&50u32.to_le_bytes());
posting_data[lengths_offset + 4..lengths_offset + 8].copy_from_slice(&75u32.to_le_bytes());
let deleted_flags_offset = 2 * 16 + 2 * 4 + 2 * 4;
posting_data[deleted_flags_offset] = 0b00000000;
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(
result.is_ok(),
"Expected valid data to pass verification, but got error: {:?}",
result.err()
);
}
#[test]
fn test_verify_posting_data_quality_empty_storage() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 64),
capacity: 10,
current_count: 0,
active_count: 0,
document_ids_offset: 64,
starts_offset: 64,
lengths_offset: 64,
deleted_flags_offset: 64,
document_id_size: 16,
reserved: [0; 12],
};
let posting_data = vec![0u8; 64];
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_ok());
}
#[test]
fn test_verify_posting_data_quality_count_exceeds_capacity() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 320),
capacity: 5,
current_count: 10, active_count: 10,
document_ids_offset: 64,
starts_offset: 224,
lengths_offset: 264,
deleted_flags_offset: 304,
document_id_size: 16,
reserved: [0; 12],
};
let posting_data = vec![0u8; 320];
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.description.contains("Current count"));
assert!(error.description.contains("exceeds capacity"));
assert_eq!(error.severity, 0.9);
assert!(error.is_recoverable);
}
#[test]
fn test_verify_posting_data_quality_arrays_beyond_bounds() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 120),
capacity: 10,
current_count: 2,
active_count: 2,
document_ids_offset: 144, starts_offset: 176, lengths_offset: 184, deleted_flags_offset: 500, document_id_size: 16,
reserved: [0; 12],
};
let posting_data = vec![0u8; 200]; let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.description.contains("extend beyond file bounds"));
assert_eq!(error.severity, 1.0);
assert!(!error.is_recoverable);
}
#[test]
fn test_verify_posting_data_quality_too_many_invalid_document_ids() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 200),
capacity: 10,
current_count: 5,
active_count: 5,
document_ids_offset: 144, starts_offset: 224, lengths_offset: 244, deleted_flags_offset: 264, document_id_size: 16,
reserved: [0; 12],
};
let mut posting_data = vec![0u8; 300];
let valid_doc_id = DocumentId::new().raw().to_le_bytes();
posting_data[144..160].copy_from_slice(&valid_doc_id);
for i in 0..5 {
posting_data[224 + i * 4..228 + i * 4].copy_from_slice(&(100u32 + i as u32 * 50).to_le_bytes());
posting_data[244 + i * 4..248 + i * 4].copy_from_slice(&50u32.to_le_bytes());
}
posting_data[264] = 0b00000000;
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.description.contains("Too many invalid document IDs"));
assert_eq!(error.severity, 0.8);
assert!(error.is_recoverable);
}
#[test]
fn test_verify_posting_data_quality_too_many_zero_lengths() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header_size = std::mem::size_of::<crate::posting_storage::PostingStorageHeader>();
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 320),
capacity: 10,
current_count: 10,
active_count: 10,
document_ids_offset: header_size as u64, starts_offset: (header_size + 10 * 16) as u64, lengths_offset: (header_size + 10 * 16 + 10 * 4) as u64, deleted_flags_offset: (header_size + 10 * 16 + 10 * 4 + 10 * 4) as u64, document_id_size: 16,
reserved: [0; 12],
};
let data_size = 10 * 16 + 10 * 4 + 10 * 4 + 2;
let mut posting_data = vec![0u8; data_size];
for i in 0..10 {
let doc_id = DocumentId::from_raw(1000u128 + i as u128)
.raw()
.to_le_bytes();
posting_data[i * 16..(i + 1) * 16].copy_from_slice(&doc_id);
}
let starts_offset = 10 * 16;
for i in 0..10 {
posting_data[starts_offset + i * 4..starts_offset + (i + 1) * 4]
.copy_from_slice(&(100u32 + i as u32 * 10).to_le_bytes());
}
let lengths_offset = 10 * 16 + 10 * 4;
for i in 0..8 {
posting_data[lengths_offset + i * 4..lengths_offset + (i + 1) * 4].copy_from_slice(&0u32.to_le_bytes());
}
posting_data[lengths_offset + 8 * 4..lengths_offset + 9 * 4].copy_from_slice(&50u32.to_le_bytes());
posting_data[lengths_offset + 9 * 4..lengths_offset + 10 * 4].copy_from_slice(&75u32.to_le_bytes());
let deleted_flags_offset = 10 * 16 + 10 * 4 + 10 * 4;
posting_data[deleted_flags_offset] = 0b00000000;
posting_data[deleted_flags_offset + 1] = 0b00000000;
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err(), "Expected error but got success");
let error = result.unwrap_err();
assert!(error.description.contains("Too many zero-length postings:"));
assert_eq!(error.severity, 0.4);
assert!(error.is_recoverable);
}
#[test]
fn test_verify_posting_data_quality_overflow_ranges() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 120),
capacity: 10,
current_count: 2,
active_count: 2,
document_ids_offset: 144, starts_offset: 176, lengths_offset: 184, deleted_flags_offset: 192, document_id_size: 16,
reserved: [0; 12],
};
let mut posting_data = vec![0u8; 200];
let doc_id1 = DocumentId::new().raw().to_le_bytes();
let doc_id2 = DocumentId::new().raw().to_le_bytes();
posting_data[144..160].copy_from_slice(&doc_id1);
posting_data[160..176].copy_from_slice(&doc_id2);
posting_data[176..180].copy_from_slice(&(u32::MAX - 10).to_le_bytes());
posting_data[180..184].copy_from_slice(&100u32.to_le_bytes());
posting_data[184..188].copy_from_slice(&20u32.to_le_bytes()); posting_data[188..192].copy_from_slice(&50u32.to_le_bytes());
posting_data[192] = 0b00000000;
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err());
let error = result.unwrap_err();
assert_eq!(error.corruption_type, CorruptionType::DataCorruption);
assert!(error.is_recoverable);
}
#[test]
fn test_verify_posting_data_quality_active_count_mismatch() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::default();
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let header = crate::posting_storage::PostingStorageHeader {
file_header: crate::memory::FileHeader::new_without_checksum(magic::POSTING_STORAGE, 1, 140),
capacity: 10,
current_count: 3,
active_count: 2, document_ids_offset: 144, starts_offset: 192, lengths_offset: 204, deleted_flags_offset: 216, document_id_size: 16,
reserved: [0; 12],
};
let mut posting_data = vec![0u8; 220];
for i in 0..3 {
let doc_id = DocumentId::new().raw().to_le_bytes();
posting_data[144 + i * 16..160 + i * 16].copy_from_slice(&doc_id);
}
for i in 0..3 {
posting_data[192 + i * 4..196 + i * 4].copy_from_slice(&(100u32 + i as u32 * 50).to_le_bytes());
posting_data[204 + i * 4..208 + i * 4].copy_from_slice(&50u32.to_le_bytes());
}
posting_data[216] = 0b00000000;
let result = checker.verify_posting_data_quality(&posting_data, &header);
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.description.contains("Active count mismatch"));
assert!(error.description.contains("header claims 2, actual 3"));
assert_eq!(error.severity, 0.6);
assert!(error.is_recoverable);
}
#[test]
fn test_overflow_arithmetic() {
let start = u32::MAX - 10;
let length = 20u32;
assert!(
start.checked_add(length).is_none(),
"Expected overflow but got {:?}",
start.checked_add(length)
);
}
#[test]
fn test_recovery_header_corruption() {
let temp_dir = TempDir::new().unwrap();
let config = IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::HeaderCorruption,
file_path: temp_dir.path().join("test_vector.vec"),
corruption_offset: Some(0),
corruption_size: Some(FileHeader::SIZE as u64),
description: "magic bytes corrupted".to_string(),
recovery_recommendations: vec!["Restore from backup".to_string()],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&corruption_report).unwrap();
assert!(!result); }
#[test]
fn test_recovery_file_truncation() {
let temp_dir = TempDir::new().unwrap();
let config = IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let minor_corruption = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: temp_dir.path().join("test.dat"),
corruption_offset: Some(4096), corruption_size: Some(1024),
description: "File truncated at offset 4096".to_string(),
recovery_recommendations: vec!["Extend file".to_string()],
severity: 0.3, is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&minor_corruption).unwrap();
assert!(result);
let mut manager2 = IntegrityManager::new(IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
});
let severe_corruption = CorruptionReport {
corruption_type: CorruptionType::FileTruncation,
file_path: temp_dir.path().join("test2.dat"),
corruption_offset: Some(100),
corruption_size: Some(1000000),
description: "Severe file truncation".to_string(),
recovery_recommendations: vec![],
severity: 0.9, is_recoverable: true,
detected_at: SystemTime::now(),
};
let result2 = manager2.attempt_recovery(&severe_corruption).unwrap();
assert!(!result2); }
#[test]
fn test_recovery_structural_inconsistency() {
let temp_dir = TempDir::new().unwrap();
let config = IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: temp_dir.path().join("test_index.idx"),
corruption_offset: Some(1000),
corruption_size: Some(256),
description: "index structure corruption detected".to_string(),
recovery_recommendations: vec!["Rebuild index".to_string()],
severity: 0.6,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&corruption_report).unwrap();
assert!(result); }
#[test]
fn test_attempt_recovery_all_types() {
let config = IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let corruption_types = vec![
CorruptionType::DataCorruption,
CorruptionType::HeaderCorruption,
CorruptionType::FileTruncation,
CorruptionType::StructuralInconsistency,
CorruptionType::CrossValidationFailure,
CorruptionType::PartialCorruption,
];
for corruption_type in corruption_types {
let corruption_report = CorruptionReport {
corruption_type: corruption_type.clone(),
file_path: PathBuf::from("/test/file.dat"),
corruption_offset: Some(1000),
corruption_size: Some(100),
description: format!("Test {} corruption", format!("{:?}", corruption_type).to_lowercase()),
recovery_recommendations: vec![],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&corruption_report);
assert!(result.is_ok(), "Recovery attempt failed for {:?}", corruption_type);
}
}
#[test]
fn test_recovery_data_corruption() {
let temp_dir = TempDir::new().unwrap();
let config = IntegrityConfig {
enable_recovery: true,
..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: temp_dir.path().join("test.dat"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Small data corruption detected".to_string(),
recovery_recommendations: vec!["Attempt data recovery".to_string()],
severity: 0.4, is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&corruption_report).unwrap();
assert!(result); }
#[test]
fn test_recovery_with_disabled_config() {
let config = IntegrityConfig {
enable_recovery: false, ..IntegrityConfig::default()
};
let mut manager = IntegrityManager::new(config);
let corruption_report = CorruptionReport {
corruption_type: CorruptionType::DataCorruption,
file_path: PathBuf::from("/test/file.dat"),
corruption_offset: Some(100),
corruption_size: Some(50),
description: "Test corruption".to_string(),
recovery_recommendations: vec![],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = manager.attempt_recovery(&corruption_report);
assert!(result.is_ok());
assert!(!result.unwrap()); }
#[tokio::test]
async fn test_pointer_corruption_repair_vector_storage() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let vector_path = temp_dir.path().join("test_vectors.vectors");
let _vector_storage = crate::vector_storage::VectorStorage::create(&vector_path, 128, 10).unwrap();
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: vector_path,
corruption_offset: Some(100),
corruption_size: Some(8),
description: "pointer corruption in vector storage detected".to_string(),
recovery_recommendations: vec!["Validate and repair vector offsets".to_string()],
severity: 0.6,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = checker.repair_structural_inconsistency(&corruption).await;
assert!(result.is_ok());
let (_success, message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(message.contains("Pointer") || message.contains("pointer"));
assert!(notes
.iter()
.any(|note| note.contains("Validating pointers")));
}
#[tokio::test]
async fn test_pointer_corruption_repair_posting_storage() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let mut checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let posting_path = temp_dir.path().join("test_postings.postings");
let _posting_storage = crate::posting_storage::PostingStorage::create(&posting_path, 10).unwrap();
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: posting_path,
corruption_offset: Some(200),
corruption_size: Some(8),
description: "offset corruption in posting storage detected".to_string(),
recovery_recommendations: vec!["Repair vector offset pointers".to_string()],
severity: 0.7,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let result = checker.repair_structural_inconsistency(&corruption).await;
assert!(result.is_ok());
let (_success, _message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("posting") || note.contains("Posting")));
}
#[tokio::test]
async fn test_pointer_validation_invalid_magic_bytes() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let _checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let test_file = temp_dir.path().join("invalid_magic.dat");
let mut mmf = MemoryMappedFile::create(&test_file, 1024).unwrap();
let invalid_header = crate::memory::StandardHeader::new_without_checksum(
b"XXXX", 1,
crate::memory::StandardHeader::SIZE as u64,
);
mmf.write_at(0, &invalid_header).unwrap();
mmf.sync().unwrap();
drop(mmf);
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: test_file,
corruption_offset: Some(0),
corruption_size: Some(4),
description: "pointer corruption with invalid magic bytes".to_string(),
recovery_recommendations: vec!["Fix magic bytes".to_string()],
severity: 0.8,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let mut checker_mut = IntegrityChecker::new(
temp_dir.path().to_path_buf(),
ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128),
);
let result = checker_mut
.repair_structural_inconsistency(&corruption)
.await;
assert!(result.is_ok());
let (_success, _message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("magic") || note.contains("Invalid")));
}
#[tokio::test]
async fn test_pointer_validation_data_offset_out_of_bounds() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let _checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let test_file = temp_dir.path().join("out_of_bounds.dat");
let file_size = 512u64;
let mut mmf = MemoryMappedFile::create(&test_file, file_size as usize).unwrap();
let bad_header = crate::memory::StandardHeader::new_without_checksum(
crate::constants::magic::TEST_GENERIC,
1,
file_size + 1000, );
mmf.write_at(0, &bad_header).unwrap();
mmf.sync().unwrap();
drop(mmf);
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: test_file,
corruption_offset: Some(16), corruption_size: Some(8),
description: "pointer data_offset exceeds file bounds".to_string(),
recovery_recommendations: vec!["Fix data offset pointer".to_string()],
severity: 0.9,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let mut checker_mut = IntegrityChecker::new(
temp_dir.path().to_path_buf(),
ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128),
);
let result = checker_mut
.repair_structural_inconsistency(&corruption)
.await;
assert!(result.is_ok());
let (_success, _message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("exceeds file size") || note.contains("out of bounds")));
}
#[tokio::test]
async fn test_pointer_validation_alignment_issues() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let _checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let test_file = temp_dir.path().join("misaligned.dat");
let mut mmf = MemoryMappedFile::create(&test_file, 1024).unwrap();
let misaligned_header = crate::memory::StandardHeader::new_without_checksum(
crate::constants::magic::TEST_GENERIC,
1,
crate::memory::StandardHeader::SIZE as u64 + 3, );
mmf.write_at(0, &misaligned_header).unwrap();
mmf.sync().unwrap();
drop(mmf);
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: test_file,
corruption_offset: Some(16),
corruption_size: Some(8),
description: "pointer alignment corruption detected".to_string(),
recovery_recommendations: vec!["Fix pointer alignment".to_string()],
severity: 0.5,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let mut checker_mut = IntegrityChecker::new(
temp_dir.path().to_path_buf(),
ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128),
);
let result = checker_mut
.repair_structural_inconsistency(&corruption)
.await;
assert!(result.is_ok());
let (_success, _message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("aligned") || note.contains("alignment")));
}
#[tokio::test]
async fn test_pointer_validation_wal_file() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let _checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let wal_file = temp_dir.path().join("test.wal");
let mut mmf = MemoryMappedFile::create(&wal_file, 1024).unwrap();
let wal_header = crate::memory::StandardHeader::new_without_checksum(
crate::constants::magic::WAL,
1,
crate::memory::StandardHeader::SIZE as u64,
);
mmf.write_at(0, &wal_header).unwrap();
mmf.sync().unwrap();
drop(mmf);
let corruption = CorruptionReport {
corruption_type: CorruptionType::StructuralInconsistency,
file_path: wal_file,
corruption_offset: Some(100),
corruption_size: Some(8),
description: "pointer issues in WAL file detected".to_string(),
recovery_recommendations: vec!["Validate WAL pointers".to_string()],
severity: 0.4,
is_recoverable: true,
detected_at: SystemTime::now(),
};
let mut checker_mut = IntegrityChecker::new(
temp_dir.path().to_path_buf(),
ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128),
);
let result = checker_mut
.repair_structural_inconsistency(&corruption)
.await;
assert!(result.is_ok());
let (_success, _message, notes) = result.unwrap();
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("WAL") || note.contains("wal")));
}
#[test]
fn test_is_valid_magic() {
let temp_dir = TempDir::new().unwrap();
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(128);
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
assert!(checker.is_valid_magic(crate::constants::magic::WAL));
assert!(checker.is_valid_magic(crate::constants::magic::VECTOR_STORAGE));
assert!(checker.is_valid_magic(crate::constants::magic::POSTING_STORAGE));
assert!(checker.is_valid_magic(crate::constants::magic::TEXT_INDEX));
assert!(checker.is_valid_magic(crate::constants::magic::TEXT_DATA));
assert!(!checker.is_valid_magic(b"XXXX"));
assert!(!checker.is_valid_magic(b"YYYY"));
assert!(!checker.is_valid_magic(b"ZZZZ"));
assert!(!checker.is_valid_magic(crate::constants::magic::TEST_GENERIC));
assert!(!checker.is_valid_magic(crate::constants::magic::TEST_SHARD));
}
#[tokio::test]
async fn test_repair_vector_data_offset_successful() {
use crate::memory::MemoryMappedFile;
use crate::vector_storage::{VectorStorage, VectorStorageHeader};
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let vector_file = temp_dir.path().join("test_vector.dat");
let storage = VectorStorage::create(&vector_file, 4, 100).unwrap();
drop(storage);
{
let mut mmf = MemoryMappedFile::open_read_write(&vector_file).unwrap();
let mut header: VectorStorageHeader = mmf.read_at(0).unwrap();
let _original_offset = header.vector_data_offset;
header.vector_data_offset = mmf.len() as u64 + 1000;
mmf.write_at(0, &header).unwrap();
let corrupted_header: VectorStorageHeader = mmf.read_at(0).unwrap();
assert_eq!(corrupted_header.vector_data_offset, mmf.len() as u64 + 1000);
}
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(4);
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let reasonable_offset = std::mem::size_of::<crate::memory::StandardHeader>() as u64
+ std::mem::size_of::<VectorStorageHeader>() as u64;
let mut notes = Vec::new();
let result = checker
.repair_vector_data_offset(&vector_file, reasonable_offset, &mut notes)
.await;
assert!(result.is_ok());
assert!(result.unwrap());
{
let mmf = MemoryMappedFile::open_read_only(&vector_file).unwrap();
let repaired_header: VectorStorageHeader = mmf.read_at(0).unwrap();
assert_eq!(repaired_header.vector_data_offset, reasonable_offset);
}
assert!(!notes.is_empty());
assert!(notes
.iter()
.any(|note| note.contains("Attempting to repair")));
assert!(notes
.iter()
.any(|note| note.contains("Successfully updated")));
assert!(notes
.iter()
.any(|note| note.contains("verification successful")));
}
#[tokio::test]
async fn test_repair_vector_data_offset_validation_failures() {
use crate::vector_storage::VectorStorage;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let vector_file = temp_dir.path().join("test_vector.dat");
let storage = VectorStorage::create(&vector_file, 4, 100).unwrap();
drop(storage);
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(4);
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
{
let mut notes = Vec::new();
let result = checker
.repair_vector_data_offset(&vector_file, 50, &mut notes)
.await;
assert!(result.is_ok());
assert!(!result.unwrap()); assert!(notes.iter().any(|note| note.contains("too small")));
}
{
let file_size = std::fs::metadata(&vector_file).unwrap().len();
let mut notes = Vec::new();
let result = checker
.repair_vector_data_offset(&vector_file, file_size + 100, &mut notes)
.await;
assert!(result.is_ok());
assert!(!result.unwrap()); assert!(notes.iter().any(|note| note.contains("exceeds file size")));
}
{
let mut notes = Vec::new();
let misaligned_offset = 1000 + 7; let result = checker
.repair_vector_data_offset(&vector_file, misaligned_offset, &mut notes)
.await;
assert!(result.is_ok());
assert!(!result.unwrap()); assert!(notes.iter().any(|note| note.contains("not 8-byte aligned")));
}
}
#[tokio::test]
async fn test_repair_vector_data_offset_file_access_failures() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let nonexistent_file = temp_dir.path().join("does_not_exist.dat");
let config = ShardexConfig::new()
.directory_path(temp_dir.path().to_path_buf())
.vector_size(4);
let checker = IntegrityChecker::new(temp_dir.path().to_path_buf(), config);
let mut notes = Vec::new();
let result = checker
.repair_vector_data_offset(&nonexistent_file, 1000, &mut notes)
.await;
assert!(result.is_ok());
assert!(!result.unwrap()); assert!(notes
.iter()
.any(|note| note.contains("Failed to open file")));
}
}