use crate::backend::native::v2::wal::V2WALConfig;
use crate::backend::native::v2::wal::checkpoint::constants::*;
use crate::backend::native::v2::wal::checkpoint::core::{
CheckpointManagerState, CheckpointState, DirtyBlockTracker,
};
use crate::backend::native::v2::wal::checkpoint::errors::{CheckpointError, CheckpointResult};
use std::fs;
use std::time::SystemTime;
#[derive(Debug, Clone, serde::Serialize)]
pub struct V2InvariantResult {
pub invariants_held: bool,
pub violations: Vec<V2InvariantViolation>,
pub validation_timestamp: SystemTime,
pub v2_version: Option<u32>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct V2InvariantViolation {
pub violation_type: V2InvariantViolationType,
pub description: String,
pub expected: Option<String>,
pub actual: Option<String>,
pub critical: bool,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub enum V2InvariantViolationType {
InvalidV2Version,
V2BlockSizeMismatch,
V2ClusterAlignmentMismatch,
V2MetadataCorruption,
ClusterBoundaryViolation,
BlockAlignmentViolation,
V2StringTableViolation,
V2FreeSpaceViolation,
ClusteredEdgeFormatViolation,
NodeRecordVersionMismatch,
}
pub struct V2InvariantValidator {
config: V2WALConfig,
}
impl V2InvariantValidator {
pub fn new(config: V2WALConfig) -> Self {
Self { config }
}
pub fn validate_v2_metadata(
&self,
checkpoint_path: &std::path::Path,
) -> CheckpointResult<V2InvariantResult> {
let mut violations = Vec::new();
let start_time = SystemTime::now();
let mut file = fs::File::open(checkpoint_path).map_err(|e| {
CheckpointError::validation(format!("Failed to open checkpoint file: {}", e))
})?;
use std::io::{Read, Seek, SeekFrom};
file.seek(SeekFrom::Start(36)).map_err(|e| {
CheckpointError::validation(format!("Failed to seek to V2 metadata: {}", e))
})?;
let mut v2_version_bytes = [0u8; 4];
file.read_exact(&mut v2_version_bytes).map_err(|e| {
CheckpointError::validation(format!("Failed to read V2 version: {}", e))
})?;
let v2_version = u32::from_le_bytes(v2_version_bytes);
if v2_version != 2 {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::InvalidV2Version,
description: format!(
"Unsupported V2 checkpoint version: {} (expected: 2)",
v2_version
),
expected: Some("2".to_string()),
actual: Some(v2_version.to_string()),
critical: true,
});
}
let mut block_size_bytes = [0u8; 8];
file.read_exact(&mut block_size_bytes).map_err(|e| {
CheckpointError::validation(format!("Failed to read V2 block size: {}", e))
})?;
let block_size = u64::from_le_bytes(block_size_bytes);
if block_size != v2::V2_GRAPH_BLOCK_SIZE {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2BlockSizeMismatch,
description: format!(
"Invalid V2 block size: {} (expected: {})",
block_size,
v2::V2_GRAPH_BLOCK_SIZE
),
expected: Some(v2::V2_GRAPH_BLOCK_SIZE.to_string()),
actual: Some(block_size.to_string()),
critical: true,
});
}
let mut alignment_bytes = [0u8; 8];
file.read_exact(&mut alignment_bytes).map_err(|e| {
CheckpointError::validation(format!("Failed to read V2 cluster alignment: {}", e))
})?;
let alignment = u64::from_le_bytes(alignment_bytes);
if alignment != v2::V2_CLUSTER_ALIGNMENT {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2ClusterAlignmentMismatch,
description: format!(
"Invalid V2 cluster alignment: {} (expected: {})",
alignment,
v2::V2_CLUSTER_ALIGNMENT
),
expected: Some(v2::V2_CLUSTER_ALIGNMENT.to_string()),
actual: Some(alignment.to_string()),
critical: true,
});
}
let invariants_held = violations
.iter()
.all(|v: &V2InvariantViolation| !v.critical);
Ok(V2InvariantResult {
invariants_held,
violations,
validation_timestamp: start_time,
v2_version: Some(v2_version),
})
}
pub fn validate_cluster_alignment_invariants(
&self,
_dirty_blocks: &DirtyBlockTracker,
) -> CheckpointResult<V2InvariantResult> {
let violations = Vec::new();
let start_time = SystemTime::now();
let _alignment = v2::V2_CLUSTER_ALIGNMENT;
let invariants_held = violations
.iter()
.all(|v: &V2InvariantViolation| !v.critical);
Ok(V2InvariantResult {
invariants_held,
violations,
validation_timestamp: start_time,
v2_version: None,
})
}
pub fn validate_checkpoint_state_invariants(
&self,
state: &CheckpointState,
manager_state: &CheckpointManagerState,
) -> CheckpointResult<V2InvariantResult> {
let mut violations = Vec::new();
let start_time = SystemTime::now();
if manager_state.in_progress && matches!(state, CheckpointState::Idle) {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: "Checkpoint marked as in_progress but state is Idle".to_string(),
expected: Some("state != Idle when in_progress is true".to_string()),
actual: Some(format!("state = {:?}, in_progress = true", state)),
critical: true,
});
}
if !manager_state.in_progress
&& (matches!(state, CheckpointState::Initializing)
|| matches!(state, CheckpointState::Collecting)
|| matches!(state, CheckpointState::Processing)
|| matches!(state, CheckpointState::Flushing)
|| matches!(state, CheckpointState::Validating))
{
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: format!(
"Checkpoint state {:?} indicates active work but in_progress is false",
state
),
expected: Some("in_progress = true for active states".to_string()),
actual: Some("in_progress = false".to_string()),
critical: true,
});
}
if manager_state.checkpoint_start_time.is_some() {
if matches!(state, CheckpointState::Idle) {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: "Checkpoint has start_time but state is Idle".to_string(),
expected: Some("state != Idle when start_time is Some".to_string()),
actual: Some(format!("state = {:?}, start_time = Some", state)),
critical: false,
});
}
}
if matches!(state, CheckpointState::Complete) {
if manager_state.completed_checkpoints == 0 && manager_state.current_operation_id == 0 {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: "Checkpoint state is Complete but no checkpoints recorded"
.to_string(),
expected: Some("completed_checkpoints > 0 when state is Complete".to_string()),
actual: Some("completed_checkpoints = 0, state = Complete".to_string()),
critical: false,
});
}
}
if manager_state.current_state != *state {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: format!(
"Checkpoint state mismatch: state parameter {:?} != manager_state.current_state {:?}",
state, manager_state.current_state
),
expected: Some("state == manager_state.current_state".to_string()),
actual: Some(format!("state ({:?}) != current_state ({:?})", state, manager_state.current_state)),
critical: true,
});
}
let invariants_held = violations
.iter()
.all(|v: &V2InvariantViolation| !v.critical);
Ok(V2InvariantResult {
invariants_held,
violations,
validation_timestamp: start_time,
v2_version: None,
})
}
pub fn validate_v2_format_compatibility(
&self,
checkpoint_path: &std::path::Path,
) -> CheckpointResult<V2InvariantResult> {
let mut violations = Vec::new();
let start_time = SystemTime::now();
let mut file = fs::File::open(checkpoint_path).map_err(|e| {
CheckpointError::validation(format!("Failed to open checkpoint file: {}", e))
})?;
use std::io::Read;
let mut magic = [0u8; 4];
file.read_exact(&mut magic).map_err(|e| {
CheckpointError::validation(format!("Failed to read checkpoint magic: {}", e))
})?;
if magic != *CHECKPOINT_MAGIC {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: "Invalid checkpoint magic number".to_string(),
expected: Some(format!("{:?}", CHECKPOINT_MAGIC)),
actual: Some(format!("{:?}", magic)),
critical: true,
});
}
let mut version_bytes = [0u8; 4];
file.read_exact(&mut version_bytes).map_err(|e| {
CheckpointError::validation(format!("Failed to read checkpoint version: {}", e))
})?;
let version = u32::from_le_bytes(version_bytes);
if version != CHECKPOINT_VERSION {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::NodeRecordVersionMismatch,
description: format!("Unsupported checkpoint version: {}", version),
expected: Some(CHECKPOINT_VERSION.to_string()),
actual: Some(version.to_string()),
critical: true,
});
}
let metadata = fs::metadata(checkpoint_path).map_err(|e| {
CheckpointError::validation(format!("Failed to read checkpoint metadata: {}", e))
})?;
if metadata.len() < MIN_CHECKPOINT_SIZE {
violations.push(V2InvariantViolation {
violation_type: V2InvariantViolationType::V2MetadataCorruption,
description: format!(
"Checkpoint file too small for V2 format: {} bytes (minimum: {})",
metadata.len(),
MIN_CHECKPOINT_SIZE
),
expected: Some(format!(">= {} bytes", MIN_CHECKPOINT_SIZE)),
actual: Some(format!("{} bytes", metadata.len())),
critical: true,
});
}
let invariants_held = violations
.iter()
.all(|v: &V2InvariantViolation| !v.critical);
Ok(V2InvariantResult {
invariants_held,
violations,
validation_timestamp: start_time,
v2_version: Some(version),
})
}
pub fn validate_v2_graph_file_invariants(
&self,
_graph_file_path: &std::path::Path,
) -> CheckpointResult<V2InvariantResult> {
let violations = Vec::new();
let start_time = SystemTime::now();
let invariants_held = true;
Ok(V2InvariantResult {
invariants_held,
violations,
validation_timestamp: start_time,
v2_version: None,
})
}
pub fn validate_comprehensive_v2_invariants(
&self,
checkpoint_path: &std::path::Path,
dirty_blocks: &DirtyBlockTracker,
checkpoint_state: &CheckpointState,
manager_state: &CheckpointManagerState,
) -> CheckpointResult<V2InvariantResult> {
let mut all_violations = Vec::new();
#[allow(unused_assignments)]
let mut v2_version = None;
let metadata_result = self.validate_v2_metadata(checkpoint_path)?;
v2_version = metadata_result.v2_version;
all_violations.extend(metadata_result.violations);
let alignment_result = self.validate_cluster_alignment_invariants(dirty_blocks)?;
all_violations.extend(alignment_result.violations);
let state_result =
self.validate_checkpoint_state_invariants(checkpoint_state, manager_state)?;
all_violations.extend(state_result.violations);
let format_result = self.validate_v2_format_compatibility(checkpoint_path)?;
all_violations.extend(format_result.violations);
let invariants_held = all_violations.iter().all(|v| !v.critical);
Ok(V2InvariantResult {
invariants_held,
violations: all_violations,
validation_timestamp: SystemTime::now(),
v2_version,
})
}
}
pub struct V2InvariantUtils;
impl V2InvariantUtils {
pub fn is_critical_violation(violation: &V2InvariantViolation) -> bool {
violation.critical
|| match violation.violation_type {
V2InvariantViolationType::InvalidV2Version => true,
V2InvariantViolationType::V2BlockSizeMismatch => true,
V2InvariantViolationType::V2ClusterAlignmentMismatch => true,
V2InvariantViolationType::V2MetadataCorruption => true,
V2InvariantViolationType::NodeRecordVersionMismatch => true,
_ => false,
}
}
pub fn get_violation_severity(violation: &V2InvariantViolation) -> V2InvariantSeverity {
if Self::is_critical_violation(violation) {
V2InvariantSeverity::Critical
} else {
match violation.violation_type {
V2InvariantViolationType::ClusterBoundaryViolation => V2InvariantSeverity::Error,
V2InvariantViolationType::BlockAlignmentViolation => V2InvariantSeverity::Warning,
V2InvariantViolationType::ClusteredEdgeFormatViolation => {
V2InvariantSeverity::Error
}
V2InvariantViolationType::V2StringTableViolation => V2InvariantSeverity::Warning,
V2InvariantViolationType::V2FreeSpaceViolation => V2InvariantSeverity::Warning,
_ => V2InvariantSeverity::Error,
}
}
}
pub fn calculate_compliance_score(violations: &[V2InvariantViolation]) -> f64 {
if violations.is_empty() {
return 1.0;
}
let critical_count = violations
.iter()
.filter(|v| Self::is_critical_violation(v))
.count();
let total_count = violations.len();
if critical_count > 0 {
return 0.0; }
1.0 - (total_count as f64 * 0.1).max(0.0).min(1.0)
}
pub fn get_violation_summary(violations: &[V2InvariantViolation]) -> V2InvariantSummary {
let mut summary = V2InvariantSummary::default();
for violation in violations {
summary.total_violations += 1;
match Self::get_violation_severity(violation) {
V2InvariantSeverity::Critical => summary.critical_violations += 1,
V2InvariantSeverity::Error => summary.error_violations += 1,
V2InvariantSeverity::Warning => summary.warning_violations += 1,
V2InvariantSeverity::Info => summary.info_violations += 1,
}
}
summary.compliance_score = Self::calculate_compliance_score(violations);
summary
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum V2InvariantSeverity {
Info = 0,
Warning = 1,
Error = 2,
Critical = 3,
}
#[derive(Debug, Clone, Default)]
pub struct V2InvariantSummary {
pub total_violations: usize,
pub critical_violations: usize,
pub error_violations: usize,
pub warning_violations: usize,
pub info_violations: usize,
pub compliance_score: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_v2_invariant_validator_creation() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
assert!(true, "V2 invariant validator created successfully");
}
#[test]
fn test_cluster_alignment_invariants() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let dirty_blocks = DirtyBlockTracker::new(100, 100);
let result = validator.validate_cluster_alignment_invariants(&dirty_blocks);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
}
#[test]
fn test_checkpoint_state_invariants() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let state = CheckpointState::default();
let manager_state = CheckpointManagerState::default();
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
}
#[test]
fn test_valid_state_transitions() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let state = CheckpointState::Idle;
let mut manager_state = CheckpointManagerState::default();
manager_state.in_progress = false;
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
let state = CheckpointState::Collecting;
manager_state.current_state = CheckpointState::Collecting;
manager_state.in_progress = true;
manager_state.checkpoint_start_time = Some(std::time::Instant::now());
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
let state = CheckpointState::Complete;
manager_state.current_state = CheckpointState::Complete;
manager_state.in_progress = false;
manager_state.completed_checkpoints = 1;
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
}
#[test]
fn test_invalid_state_transition_idle_with_in_progress() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let state = CheckpointState::Idle;
let mut manager_state = CheckpointManagerState::default();
manager_state.current_state = CheckpointState::Idle;
manager_state.in_progress = true;
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(!invariant_result.invariants_held);
assert!(!invariant_result.violations.is_empty());
assert!(invariant_result.violations.iter().any(|v| v.critical
&& matches!(
v.violation_type,
V2InvariantViolationType::V2MetadataCorruption
)));
}
#[test]
fn test_invalid_state_active_without_in_progress() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let state = CheckpointState::Processing;
let mut manager_state = CheckpointManagerState::default();
manager_state.current_state = CheckpointState::Processing;
manager_state.in_progress = false;
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(!invariant_result.invariants_held);
assert!(!invariant_result.violations.is_empty());
}
#[test]
fn test_failed_state_from_any() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let state = CheckpointState::Failed;
let mut manager_state = CheckpointManagerState::default();
manager_state.current_state = CheckpointState::Failed;
manager_state.in_progress = false;
manager_state.failed_attempts = 1;
let result = validator.validate_checkpoint_state_invariants(&state, &manager_state);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
}
#[test]
fn test_v2_invariant_utils_severity() {
let critical_violation = V2InvariantViolation {
violation_type: V2InvariantViolationType::InvalidV2Version,
description: "Invalid version".to_string(),
expected: Some("2".to_string()),
actual: Some("1".to_string()),
critical: true,
};
assert!(V2InvariantUtils::is_critical_violation(&critical_violation));
assert_eq!(
V2InvariantUtils::get_violation_severity(&critical_violation),
V2InvariantSeverity::Critical
);
let warning_violation = V2InvariantViolation {
violation_type: V2InvariantViolationType::BlockAlignmentViolation,
description: "Block not aligned".to_string(),
expected: None,
actual: None,
critical: false,
};
assert!(!V2InvariantUtils::is_critical_violation(&warning_violation));
assert_eq!(
V2InvariantUtils::get_violation_severity(&warning_violation),
V2InvariantSeverity::Warning
);
}
#[test]
fn test_v2_invariant_utils_compliance_score() {
let violations = vec![V2InvariantViolation {
violation_type: V2InvariantViolationType::BlockAlignmentViolation,
description: "Warning violation".to_string(),
expected: None,
actual: None,
critical: false,
}];
let score = V2InvariantUtils::calculate_compliance_score(&violations);
assert!(score < 1.0);
assert!(score > 0.0);
let critical_violations = vec![V2InvariantViolation {
violation_type: V2InvariantViolationType::InvalidV2Version,
description: "Critical violation".to_string(),
expected: None,
actual: None,
critical: true,
}];
let critical_score = V2InvariantUtils::calculate_compliance_score(&critical_violations);
assert_eq!(critical_score, 0.0);
}
#[test]
fn test_v2_invariant_summary() {
let violations = vec![
V2InvariantViolation {
violation_type: V2InvariantViolationType::InvalidV2Version,
description: "Critical violation".to_string(),
expected: None,
actual: None,
critical: true,
},
V2InvariantViolation {
violation_type: V2InvariantViolationType::BlockAlignmentViolation,
description: "Warning violation".to_string(),
expected: None,
actual: None,
critical: false,
},
];
let summary = V2InvariantUtils::get_violation_summary(&violations);
assert_eq!(summary.total_violations, 2);
assert_eq!(summary.critical_violations, 1);
assert_eq!(summary.compliance_score, 0.0); }
#[test]
fn test_v2_invariant_severity_ordering() {
assert!(V2InvariantSeverity::Info < V2InvariantSeverity::Warning);
assert!(V2InvariantSeverity::Warning < V2InvariantSeverity::Error);
assert!(V2InvariantSeverity::Error < V2InvariantSeverity::Critical);
}
#[test]
fn test_v2_graph_file_invariants() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = V2InvariantValidator::new(config);
let graph_file_path = temp_dir.path().join("test.graph");
let result = validator.validate_v2_graph_file_invariants(&graph_file_path);
assert!(result.is_ok());
let invariant_result = result.unwrap();
assert!(invariant_result.invariants_held);
assert!(invariant_result.violations.is_empty());
}
}