use crate::backend::native::v2::Direction;
use crate::backend::native::v2::free_space::FreeSpaceManager;
use crate::backend::native::v2::node_record_v2::NodeRecordV2;
use crate::backend::native::v2::string_table::StringTable;
use crate::backend::native::v2::wal::V2WALRecord;
use crate::backend::native::v2::wal::recovery::core::TransactionState;
use crate::backend::native::v2::wal::recovery::errors::{RecoveryError, RecoveryResult};
use crate::backend::native::{EdgeStore, GraphFile, NodeStore};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
pub mod cluster_validation;
pub mod cross_record;
pub mod edge_validation;
pub mod free_space_validation;
pub mod node_validation;
pub mod string_validation;
pub use cluster_validation::validate_cluster_create;
pub use cross_record::{validate_cross_record_consistency, validate_v2_invariants};
pub use edge_validation::{validate_edge_delete, validate_edge_insert, validate_edge_update};
pub use free_space_validation::{validate_free_space_allocate, validate_free_space_deallocate};
pub use node_validation::{validate_node_delete, validate_node_insert, validate_node_update};
pub use string_validation::validate_string_insert;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationResult {
Valid,
Recoverable {
issues: Vec<String>,
severity: ValidationSeverity,
},
Invalid {
errors: Vec<String>,
critical_error: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ValidationSeverity {
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Default)]
pub struct ValidationStatistics {
pub total_records: u64,
pub valid_records: u64,
pub recoverable_records: u64,
pub invalid_records: u64,
pub node_inconsistencies: u64,
pub cluster_inconsistencies: u64,
pub string_table_inconsistencies: u64,
pub free_space_inconsistencies: u64,
}
impl ValidationStatistics {
pub fn success_rate(&self) -> f64 {
if self.total_records == 0 {
0.0
} else {
self.valid_records as f64 / self.total_records as f64 * 100.0
}
}
pub fn summary(&self) -> String {
format!(
"Validation Summary: {} records, {:.1}% valid, {} recoverable, {} invalid",
self.total_records,
self.success_rate(),
self.recoverable_records,
self.invalid_records
)
}
}
#[derive(Debug, Clone)]
pub(crate) struct ClusterMetadata {
offset: u64,
size: u32,
edge_count: u32,
last_modified_lsn: u64,
created_lsn: u64,
}
pub struct TransactionValidator {
graph_file_path: PathBuf,
graph_file: Arc<Mutex<Option<GraphFile>>>,
node_store: Arc<Mutex<Option<NodeStore<'static>>>>,
edge_store: Arc<Mutex<Option<EdgeStore<'static>>>>,
string_table: Arc<Mutex<Option<StringTable>>>,
free_space_manager: Arc<Mutex<Option<FreeSpaceManager>>>,
node_cache: HashMap<i64, NodeRecordV2>,
cluster_metadata: HashMap<(i64, Direction), ClusterMetadata>,
string_cache: HashMap<u32, String>,
free_space_regions: HashSet<(u64, u32)>,
}
impl TransactionValidator {
pub fn new(graph_file_path: PathBuf) -> RecoveryResult<Self> {
let validator = Self {
graph_file_path,
graph_file: Arc::new(Mutex::new(None)),
node_store: Arc::new(Mutex::new(None)),
edge_store: Arc::new(Mutex::new(None)),
string_table: Arc::new(Mutex::new(None)),
free_space_manager: Arc::new(Mutex::new(None)),
node_cache: HashMap::new(),
cluster_metadata: HashMap::new(),
string_cache: HashMap::new(),
free_space_regions: HashSet::new(),
};
Ok(validator)
}
pub fn initialize(&mut self) -> RecoveryResult<()> {
let mut graph_file = GraphFile::open(&self.graph_file_path).map_err(|e| {
RecoveryError::validation(format!(
"Failed to open V2 graph file for validation: {}",
e
))
})?;
let node_store = unsafe { create_node_store(&mut graph_file) };
let edge_store = unsafe { create_edge_store(&mut graph_file) };
let string_table = StringTable::new();
let free_space_manager = FreeSpaceManager::new(
crate::backend::native::v2::free_space::AllocationStrategy::FirstFit,
);
match self.graph_file.lock() {
Ok(mut guard) => *guard = Some(graph_file),
Err(poisoned) => {
eprintln!(
"WARNING: Graph file mutex poisoned during validator initialization. Recovering..."
);
*poisoned.into_inner() = Some(graph_file);
}
}
match self.node_store.lock() {
Ok(mut guard) => *guard = Some(node_store),
Err(poisoned) => {
eprintln!(
"WARNING: Node store mutex poisoned during validator initialization. Recovering..."
);
*poisoned.into_inner() = Some(node_store);
}
}
match self.edge_store.lock() {
Ok(mut guard) => *guard = Some(edge_store),
Err(poisoned) => {
eprintln!(
"WARNING: Edge store mutex poisoned during validator initialization. Recovering..."
);
*poisoned.into_inner() = Some(edge_store);
}
}
match self.string_table.lock() {
Ok(mut guard) => *guard = Some(string_table),
Err(poisoned) => {
eprintln!(
"WARNING: String table mutex poisoned during validator initialization. Recovering..."
);
*poisoned.into_inner() = Some(string_table);
}
}
match self.free_space_manager.lock() {
Ok(mut guard) => *guard = Some(free_space_manager),
Err(poisoned) => {
eprintln!(
"WARNING: Free space manager mutex poisoned during validator initialization. Recovering..."
);
*poisoned.into_inner() = Some(free_space_manager);
}
}
Ok(())
}
#[allow(unused_assignments)]
#[allow(unused_variables)]
pub fn validate_transaction(
&mut self,
transaction: &TransactionState,
) -> RecoveryResult<ValidationResult> {
let mut stats = ValidationStatistics::default();
stats.total_records = transaction.records.len() as u64;
let mut issues = Vec::new();
let mut errors = Vec::new();
for (i, record) in transaction.records.iter().enumerate() {
let record_result = self.validate_record(record, transaction.start_lsn)?;
match record_result {
ValidationResult::Valid => {
stats.valid_records += 1;
self.update_validation_cache(record)?;
}
ValidationResult::Recoverable {
issues: ref record_issues,
..
} => {
stats.recoverable_records += 1;
issues.extend(record_issues.clone());
}
ValidationResult::Invalid {
errors: ref record_errors,
critical_error: ref critical,
} => {
stats.invalid_records += 1;
errors.push(format!("Record {}: {}", i, critical));
errors.extend(record_errors.clone());
}
}
}
let cross_validation_issues = validate_cross_record_consistency(self, transaction)?;
issues.extend(cross_validation_issues);
let v2_invariant_issues = validate_v2_invariants(self, transaction)?;
issues.extend(v2_invariant_issues);
if !errors.is_empty() {
Ok(ValidationResult::Invalid {
errors,
critical_error: "Transaction contains invalid records".to_string(),
})
} else if !issues.is_empty() {
let severity = if issues.iter().any(|i| i.contains("Critical")) {
ValidationSeverity::Critical
} else if issues.iter().any(|i| i.contains("Error")) {
ValidationSeverity::Error
} else {
ValidationSeverity::Warning
};
Ok(ValidationResult::Recoverable { issues, severity })
} else {
Ok(ValidationResult::Valid)
}
}
fn validate_record(&self, record: &V2WALRecord, lsn: u64) -> RecoveryResult<ValidationResult> {
match record {
V2WALRecord::NodeInsert {
node_id, node_data, ..
} => validate_node_insert(self, *node_id, node_data, lsn),
V2WALRecord::NodeUpdate {
node_id,
old_data,
new_data,
..
} => validate_node_update(self, *node_id, old_data, new_data, lsn),
V2WALRecord::NodeDelete {
node_id, old_data, ..
} => validate_node_delete(self, *node_id, old_data, lsn),
V2WALRecord::ClusterCreate {
node_id,
direction,
cluster_offset,
cluster_size,
edge_data,
..
} => validate_cluster_create(
self,
*node_id,
*direction,
*cluster_offset,
*cluster_size,
edge_data,
lsn,
),
V2WALRecord::EdgeInsert { .. } => validate_edge_insert(self, record, lsn),
V2WALRecord::EdgeUpdate { .. } => validate_edge_update(self, record, lsn),
V2WALRecord::EdgeDelete { .. } => validate_edge_delete(self, record, lsn),
V2WALRecord::StringInsert {
string_id,
string_value,
..
} => validate_string_insert(self, *string_id, string_value, lsn),
V2WALRecord::FreeSpaceAllocate {
block_offset,
block_size,
..
} => validate_free_space_allocate(self, *block_offset, *block_size, lsn),
V2WALRecord::FreeSpaceDeallocate {
block_offset,
block_size,
..
} => validate_free_space_deallocate(self, *block_offset, *block_size, lsn),
V2WALRecord::TransactionBegin { .. }
| V2WALRecord::TransactionCommit { .. }
| V2WALRecord::TransactionRollback { .. }
| V2WALRecord::TransactionPrepare { .. }
| V2WALRecord::TransactionAbort { .. }
| V2WALRecord::SavepointCreate { .. }
| V2WALRecord::SavepointRollback { .. }
| V2WALRecord::SavepointRelease { .. }
| V2WALRecord::BackupCreate { .. }
| V2WALRecord::BackupRestore { .. }
| V2WALRecord::LockAcquire { .. }
| V2WALRecord::LockRelease { .. }
| V2WALRecord::IndexUpdate { .. }
| V2WALRecord::StatisticsUpdate { .. }
| V2WALRecord::AllocateContiguous { .. }
| V2WALRecord::CommitContiguous { .. }
| V2WALRecord::RollbackContiguous { .. }
| V2WALRecord::Checkpoint { .. }
| V2WALRecord::HeaderUpdate { .. }
| V2WALRecord::SegmentEnd { .. } => Ok(ValidationResult::Valid),
V2WALRecord::KvSet {
key,
value_bytes: _,
version,
..
} => {
if key.is_empty() {
Ok(ValidationResult::Invalid {
errors: vec!["KvSet key cannot be empty".to_string()],
critical_error: format!("KvSet at LSN {} has empty key", lsn),
})
} else if *version == 0 {
Ok(ValidationResult::Invalid {
errors: vec!["KvSet version cannot be zero".to_string()],
critical_error: format!("KvSet at LSN {} has invalid version", lsn),
})
} else {
Ok(ValidationResult::Valid)
}
}
V2WALRecord::KvDelete {
key, old_version, ..
} => {
if key.is_empty() {
Ok(ValidationResult::Invalid {
errors: vec!["KvDelete key cannot be empty".to_string()],
critical_error: format!("KvDelete at LSN {} has empty key", lsn),
})
} else if *old_version == 0 {
Ok(ValidationResult::Invalid {
errors: vec!["KvDelete old_version cannot be zero".to_string()],
critical_error: format!("KvDelete at LSN {} has invalid version", lsn),
})
} else {
Ok(ValidationResult::Valid)
}
}
}
}
fn update_validation_cache(&mut self, record: &V2WALRecord) -> RecoveryResult<()> {
match record {
V2WALRecord::NodeInsert {
node_id, node_data, ..
} => {
if let Ok(node_record) = NodeRecordV2::deserialize(node_data) {
self.node_cache.insert(*node_id, node_record);
}
}
V2WALRecord::ClusterCreate {
node_id,
direction,
cluster_offset,
cluster_size,
..
} => {
let metadata = ClusterMetadata {
offset: *cluster_offset,
size: *cluster_size,
edge_count: 0,
last_modified_lsn: 0,
created_lsn: 0,
};
self.cluster_metadata
.insert((*node_id, *direction), metadata);
}
V2WALRecord::StringInsert {
string_id,
string_value,
..
} => {
self.string_cache.insert(*string_id, string_value.clone());
}
V2WALRecord::FreeSpaceAllocate {
block_offset,
block_size,
..
} => {
self.free_space_regions.insert((*block_offset, *block_size));
}
V2WALRecord::FreeSpaceDeallocate {
block_offset,
block_size,
..
} => {
self.free_space_regions
.remove(&(*block_offset, *block_size));
}
_ => {}
}
Ok(())
}
pub(crate) fn node_cache(&self) -> &HashMap<i64, NodeRecordV2> {
&self.node_cache
}
pub(crate) fn cluster_metadata(&self) -> &HashMap<(i64, Direction), ClusterMetadata> {
&self.cluster_metadata
}
pub(crate) fn string_cache(&self) -> &HashMap<u32, String> {
&self.string_cache
}
pub(crate) fn free_space_regions(&self) -> &HashSet<(u64, u32)> {
&self.free_space_regions
}
}
pub struct RecoveryValidator {
transaction_validator: TransactionValidator,
graph_file_path: PathBuf,
}
impl RecoveryValidator {
pub fn new(graph_file_path: PathBuf) -> RecoveryResult<Self> {
let mut transaction_validator = TransactionValidator::new(graph_file_path.clone())?;
transaction_validator.initialize()?;
Ok(Self {
transaction_validator,
graph_file_path,
})
}
pub fn validate_database_integrity(&self) -> RecoveryResult<ValidationResult> {
let issues = Vec::new();
let mut errors = Vec::new();
let graph_file = GraphFile::open(&self.graph_file_path).map_err(|e| {
RecoveryError::validation(format!(
"Failed to open graph file for integrity check: {}",
e
))
})?;
let header = graph_file.persistent_header();
if let Err(e) = header.validate() {
return Ok(ValidationResult::Invalid {
errors: vec![format!("Persistent header validation failed: {}", e)],
critical_error: "Graph file header is corrupted or invalid".to_string(),
});
}
let expected_magic = crate::backend::native::constants::MAGIC_BYTES;
if header.magic != expected_magic {
errors.push(format!(
"Magic number mismatch: expected {:?}, found {:?}",
String::from_utf8_lossy(&expected_magic),
String::from_utf8_lossy(&header.magic)
));
}
let expected_version = crate::backend::native::constants::FILE_FORMAT_VERSION;
if header.version != expected_version {
errors.push(format!(
"File version mismatch: expected {}, found {}",
expected_version, header.version
));
}
use crate::backend::native::constants::HEADER_SIZE;
if header.node_data_offset < HEADER_SIZE as u64 {
errors.push(format!(
"node_data_offset {} is less than header size {}",
header.node_data_offset, HEADER_SIZE
));
}
if header.edge_data_offset < header.node_data_offset {
errors.push(format!(
"edge_data_offset {} is less than node_data_offset {}",
header.edge_data_offset, header.node_data_offset
));
}
if !errors.is_empty() {
Ok(ValidationResult::Invalid {
errors,
critical_error: "Database integrity check failed".to_string(),
})
} else if !issues.is_empty() {
Ok(ValidationResult::Recoverable {
issues,
severity: ValidationSeverity::Warning,
})
} else {
Ok(ValidationResult::Valid)
}
}
#[allow(unused_assignments)]
pub fn validate_recovery_sequence(
&mut self,
transactions: &[TransactionState],
) -> RecoveryResult<(ValidationStatistics, Vec<String>)> {
let mut stats = ValidationStatistics::default();
let mut all_issues = Vec::new();
for transaction in transactions {
let result = self
.transaction_validator
.validate_transaction(transaction)?;
match result {
ValidationResult::Valid => {
stats.valid_records += transaction.records.len() as u64;
}
ValidationResult::Recoverable { issues, .. } => {
stats.recoverable_records += transaction.records.len() as u64;
all_issues.extend(issues);
}
ValidationResult::Invalid { errors, .. } => {
stats.invalid_records += transaction.records.len() as u64;
return Err(RecoveryError::validation(format!(
"Critical validation errors in recovery sequence: {}",
errors.join("; ")
)));
}
}
stats.total_records += transaction.records.len() as u64;
}
Ok((stats, all_issues))
}
}
pub unsafe fn create_node_store(graph_file: &mut GraphFile) -> NodeStore<'static> {
unsafe { NodeStore::new(mem::transmute::<&mut _, &'static mut _>(graph_file)) }
}
pub unsafe fn create_edge_store(graph_file: &mut GraphFile) -> EdgeStore<'static> {
unsafe { EdgeStore::new(mem::transmute::<&mut _, &'static mut _>(graph_file)) }
}
pub const MAX_NODE_RECORD_SIZE: usize = 4096; pub const MAX_CLUSTER_SIZE: u32 = 1024 * 1024; pub const MAX_EDGES_PER_CLUSTER: u32 = 10000; pub const MAX_EDGE_RECORD_SIZE: usize = 1024; pub const MAX_INSERTION_POINT: u32 = 100000; pub const MAX_STRING_LENGTH: usize = 10000; pub const V2_CLUSTER_ALIGNMENT: u64 = 64; pub const V2_BLOCK_ALIGNMENT: u64 = 4096; pub const MIN_BLOCK_SIZE: u32 = 64; pub const MAX_BLOCK_SIZE: u32 = 1024 * 1024; pub const MAX_RECORDS_PER_TRANSACTION: usize = 10000; pub const MAX_TRANSACTION_ALLOCATION: u64 = 100 * 1024 * 1024;