pub use self::rules::{
FileValidationRules, ValidationConfig, ValidationContext, ValidationRule, ValidationRuleEngine,
ValidationSeverity,
};
pub use self::consistency::{
CheckpointConsistencyValidator, ConsistencyResult, ConsistencySeverity, ConsistencyUtils,
ConsistencyViolation, ConsistencyViolationType,
};
pub use self::invariants::{
V2InvariantResult, V2InvariantSeverity, V2InvariantSummary, V2InvariantUtils,
V2InvariantValidator, V2InvariantViolation, V2InvariantViolationType,
};
pub use self::reporting::{
AnomalySummary, CheckpointValidationReport, CheckpointValidationReporter, PerformanceMetrics,
TrendAnalysis, ValidationReportUtils, ValidationStatus, ValidationSummary,
};
use crate::backend::native::v2::wal::V2WALConfig;
use crate::backend::native::v2::wal::checkpoint::core::{
CheckpointProgress, CheckpointState, DirtyBlockTracker,
};
use crate::backend::native::v2::wal::checkpoint::errors::{CheckpointError, CheckpointResult};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime};
pub struct CheckpointValidator {
config: V2WALConfig,
rule_engine: ValidationRuleEngine,
consistency_validator: CheckpointConsistencyValidator,
invariants_validator: V2InvariantValidator,
reporter: CheckpointValidationReporter,
}
impl CheckpointValidator {
pub fn new(config: V2WALConfig) -> Self {
let rule_engine = ValidationRuleEngine::new();
let consistency_validator = CheckpointConsistencyValidator::new(config.clone());
let invariants_validator = V2InvariantValidator::new(config.clone());
let reporter = CheckpointValidationReporter::new(config.clone());
Self {
config,
rule_engine,
consistency_validator,
invariants_validator,
reporter,
}
}
pub fn validate_checkpoint_file(
&self,
checkpoint_path: &std::path::Path,
) -> CheckpointResult<bool> {
if !checkpoint_path.exists() {
return Ok(false);
}
FileValidationRules::validate_file_size(checkpoint_path)?;
use std::fs::File;
let mut file = File::open(checkpoint_path).map_err(|e| {
CheckpointError::validation(format!("Failed to open checkpoint file: {}", e))
})?;
FileValidationRules::validate_magic_number(&mut file)?;
FileValidationRules::validate_version(&mut file)?;
Ok(true)
}
pub fn validate_checkpoint_consistency(
&self,
checkpoint_lsn_range: (u64, u64),
last_checkpointed_lsn: u64,
) -> CheckpointResult<()> {
let result = self
.consistency_validator
.validate_checkpoint_consistency(checkpoint_lsn_range, last_checkpointed_lsn);
if !result.is_consistent {
let error_msg = result
.violations
.first()
.map(|v| v.description.clone())
.unwrap_or_else(|| "Consistency validation failed".to_string());
return Err(CheckpointError::validation(error_msg));
}
Ok(())
}
pub fn validate_dirty_block_consistency(
&self,
dirty_blocks: &DirtyBlockTracker,
max_pending_blocks: u64,
) -> CheckpointResult<()> {
let (cluster_blocks, global_blocks) = dirty_blocks.get_statistics();
let total_blocks = (cluster_blocks + global_blocks) as u64;
if total_blocks > max_pending_blocks {
return Err(CheckpointError::validation(format!(
"Too many pending dirty blocks: {} (maximum: {})",
total_blocks, max_pending_blocks
)));
}
if global_blocks as u64 > MAX_GLOBAL_DIRTY_BLOCKS as u64 {
return Err(CheckpointError::validation(format!(
"Too many global dirty blocks: {} (maximum: {})",
global_blocks as u64, MAX_GLOBAL_DIRTY_BLOCKS
)));
}
Ok(())
}
pub fn validate_comprehensive(
&self,
checkpoint_path: &std::path::Path,
_dirty_blocks: &DirtyBlockTracker,
_checkpoint_state: &CheckpointState,
_checkpoint_progress: &CheckpointProgress,
checkpoint_lsn_range: (u64, u64),
last_checkpointed_lsn: u64,
checkpoint_duration: Duration,
_max_pending_blocks: u64,
) -> CheckpointResult<CheckpointValidationReport> {
let v2_invariant_result = Some(
self.invariants_validator
.validate_v2_metadata(checkpoint_path)?,
);
let consistency_result = Some(
self.consistency_validator
.validate_checkpoint_consistency(checkpoint_lsn_range, last_checkpointed_lsn),
);
let report = self.reporter.generate_validation_report(
checkpoint_path,
consistency_result,
v2_invariant_result,
None, Some(checkpoint_duration),
);
match report.validation_status {
ValidationStatus::CriticalFailure | ValidationStatus::Failed => {
let error_msg = format!("Validation failed: {:?}", report.validation_status);
Err(CheckpointError::validation(error_msg))
}
_ => Ok(report),
}
}
pub fn rule_engine(&self) -> &ValidationRuleEngine {
&self.rule_engine
}
pub fn consistency_validator(&self) -> &CheckpointConsistencyValidator {
&self.consistency_validator
}
pub fn invariants_validator(&self) -> &V2InvariantValidator {
&self.invariants_validator
}
pub fn reporter(&self) -> &CheckpointValidationReporter {
&self.reporter
}
}
pub struct CheckpointMetrics {
config: V2WALConfig,
metrics: Arc<Mutex<CheckpointMetricsData>>,
}
#[derive(Debug, Default, Clone)]
pub struct CheckpointMetricsData {
pub total_checkpoints: u64,
pub avg_checkpoint_duration_ms: u64,
pub avg_blocks_per_checkpoint: u64,
pub avg_records_per_checkpoint: u64,
pub checkpoint_throughput_mbps: f64,
pub time_since_last_checkpoint_ms: u64,
pub wal_size_at_last_checkpoint: u64,
pub pending_dirty_blocks: u64,
pub last_checkpoint_timestamp: Option<SystemTime>,
pub recent_durations_ms: Vec<u64>,
pub anomaly_detector: AnomalyDetector,
}
#[derive(Debug, Default, Clone)]
pub struct AnomalyDetector {
pub baseline_duration_ms: u64,
pub baseline_throughput_mbps: f64,
pub baseline_blocks_per_checkpoint: u64,
pub duration_anomaly_threshold: f64,
pub throughput_anomaly_threshold: f64,
pub block_count_anomaly_threshold: f64,
pub duration_anomalies: u64,
pub throughput_anomalies: u64,
pub block_count_anomalies: u64,
}
impl CheckpointMetrics {
pub fn new(config: V2WALConfig) -> Self {
let metrics = CheckpointMetricsData {
anomaly_detector: AnomalyDetector {
baseline_duration_ms: performance::TARGET_CHECKPOINT_THROUGHPUT_MBPS as u64 * 1000,
baseline_throughput_mbps: performance::TARGET_CHECKPOINT_THROUGHPUT_MBPS,
baseline_blocks_per_checkpoint: 100,
duration_anomaly_threshold: 2.0,
throughput_anomaly_threshold: 0.5,
block_count_anomaly_threshold: 3.0,
..Default::default()
},
..Default::default()
};
Self {
config,
metrics: Arc::new(Mutex::new(metrics)),
}
}
pub fn update_checkpoint_metrics(
&self,
progress: &CheckpointProgress,
start_time: Instant,
) -> CheckpointResult<()> {
let duration_ms = start_time.elapsed().as_millis() as u64;
let mut metrics = self
.metrics
.lock()
.map_err(|e| CheckpointError::validation(format!("Failed to lock metrics: {}", e)))?;
metrics.total_checkpoints += 1;
let alpha = METRICS_SMOOTHING_ALPHA;
metrics.avg_checkpoint_duration_ms = ((metrics.avg_checkpoint_duration_ms as f64
* (1.0 - alpha))
+ (duration_ms as f64 * alpha)) as u64;
metrics.avg_blocks_per_checkpoint =
((metrics.avg_blocks_per_checkpoint as f64 * (1.0 - alpha))
+ (progress.flushed_blocks as f64 * alpha)) as u64;
metrics.avg_records_per_checkpoint =
((metrics.avg_records_per_checkpoint as f64 * (1.0 - alpha))
+ (progress.total_records as f64 * alpha)) as u64;
if duration_ms > 0 {
let bytes_processed = progress.total_records * 100;
let mb_per_second =
(bytes_processed as f64) / (1024.0 * 1024.0) / (duration_ms as f64 / 1000.0);
metrics.checkpoint_throughput_mbps =
(metrics.checkpoint_throughput_mbps * (1.0 - alpha)) + (mb_per_second * alpha);
}
metrics.last_checkpoint_timestamp = Some(SystemTime::now());
metrics.recent_durations_ms.push(duration_ms);
if metrics.recent_durations_ms.len() > MAX_PROGRESS_ENTRIES {
metrics.recent_durations_ms.remove(0);
}
self.detect_anomalies(&mut metrics, duration_ms, progress);
Ok(())
}
fn detect_anomalies(
&self,
metrics: &mut CheckpointMetricsData,
duration_ms: u64,
progress: &CheckpointProgress,
) {
let detector = &mut metrics.anomaly_detector;
if duration_ms as f64
> detector.baseline_duration_ms as f64 * detector.duration_anomaly_threshold
{
detector.duration_anomalies += 1;
}
if metrics.checkpoint_throughput_mbps
< detector.baseline_throughput_mbps * detector.throughput_anomaly_threshold
{
detector.throughput_anomalies += 1;
}
if progress.flushed_blocks as f64
> detector.baseline_blocks_per_checkpoint as f64
* detector.block_count_anomaly_threshold
{
detector.block_count_anomalies += 1;
}
}
pub fn get_metrics(&self) -> CheckpointResult<CheckpointMetricsData> {
let mut metrics = self
.metrics
.lock()
.map_err(|e| CheckpointError::validation(format!("Failed to lock metrics: {}", e)))?;
if let Some(last_checkpoint) = metrics.last_checkpoint_timestamp {
metrics.time_since_last_checkpoint_ms = last_checkpoint
.elapsed()
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
}
Ok(metrics.clone())
}
pub fn reset_metrics(&self) -> CheckpointResult<()> {
let mut metrics = self
.metrics
.lock()
.map_err(|e| CheckpointError::validation(format!("Failed to lock metrics: {}", e)))?;
*metrics = CheckpointMetricsData {
anomaly_detector: AnomalyDetector {
baseline_duration_ms: performance::TARGET_CHECKPOINT_THROUGHPUT_MBPS as u64 * 1000,
baseline_throughput_mbps: performance::TARGET_CHECKPOINT_THROUGHPUT_MBPS,
baseline_blocks_per_checkpoint: 100,
duration_anomaly_threshold: 2.0,
throughput_anomaly_threshold: 0.5,
block_count_anomaly_threshold: 3.0,
..Default::default()
},
..Default::default()
};
Ok(())
}
pub fn generate_performance_report(&self) -> CheckpointResult<String> {
let metrics = self.get_metrics()?;
let performance_metrics = PerformanceMetrics {
total_checkpoints: metrics.total_checkpoints,
avg_checkpoint_duration_ms: metrics.avg_checkpoint_duration_ms,
checkpoint_throughput_mbps: metrics.checkpoint_throughput_mbps,
avg_blocks_per_checkpoint: metrics.avg_blocks_per_checkpoint,
avg_records_per_checkpoint: metrics.avg_records_per_checkpoint,
anomaly_summary: AnomalySummary {
duration_anomalies: metrics.anomaly_detector.duration_anomalies,
throughput_anomalies: metrics.anomaly_detector.throughput_anomalies,
block_count_anomalies: metrics.anomaly_detector.block_count_anomalies,
anomaly_percentage: if metrics.total_checkpoints > 0 {
((metrics.anomaly_detector.duration_anomalies
+ metrics.anomaly_detector.throughput_anomalies
+ metrics.anomaly_detector.block_count_anomalies)
as f64
/ (metrics.total_checkpoints as f64 * 3.0))
* 100.0
} else {
0.0
},
},
};
let reporter = CheckpointValidationReporter::new(self.config.clone());
Ok(reporter.generate_performance_report(&performance_metrics))
}
}
pub struct CheckpointCleanup {
config: V2WALConfig,
}
impl CheckpointCleanup {
pub fn new(config: V2WALConfig) -> Self {
Self { config }
}
pub fn clear_checkpointed_blocks(
&self,
_dirty_blocks: &mut DirtyBlockTracker,
checkpointed_blocks: &[u64],
) -> CheckpointResult<()> {
if !checkpointed_blocks.is_empty() {
println!(
"Cleaning up {} checkpointed blocks",
checkpointed_blocks.len()
);
}
Ok(())
}
pub fn force_checkpoint_if_needed(
&self,
state: &CheckpointState,
last_checkpoint_time: SystemTime,
max_wait_time: Duration,
) -> CheckpointResult<bool> {
let time_since_last = last_checkpoint_time.elapsed().unwrap_or(Duration::ZERO);
if time_since_last > max_wait_time {
return Ok(true);
}
match state {
CheckpointState::Initializing
| CheckpointState::Collecting
| CheckpointState::Processing
| CheckpointState::Flushing => {
if time_since_last > Duration::from_millis(DEFAULT_CHECKPOINT_TIMEOUT_MS) {
return Ok(true);
}
}
_ => {}
}
Ok(false)
}
pub fn cleanup_old_checkpoints(
&self,
max_checkpoints_to_keep: usize,
) -> CheckpointResult<usize> {
use std::fs;
let checkpoint_dir =
self.config.checkpoint_path.parent().ok_or_else(|| {
CheckpointError::validation("Invalid checkpoint path".to_string())
})?;
let mut checkpoint_files = Vec::new();
for entry in fs::read_dir(checkpoint_dir).map_err(|e| {
CheckpointError::validation(format!("Failed to read checkpoint directory: {}", e))
})? {
let entry = entry.map_err(|e| {
CheckpointError::validation(format!("Failed to read directory entry: {}", e))
})?;
let path = entry.path();
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.ends_with(".checkpoint")
&& filename
!= self
.config
.checkpoint_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("")
{
if let Ok(metadata) = fs::metadata(&path) {
if let Ok(modified) = metadata.modified() {
checkpoint_files.push((path, modified));
}
}
}
}
}
checkpoint_files.sort_by_key(|(_, modified)| *modified);
let files_to_remove = if checkpoint_files.len() > max_checkpoints_to_keep {
checkpoint_files.len() - max_checkpoints_to_keep
} else {
0
};
let mut removed_count = 0;
for (path, _) in checkpoint_files.iter().take(files_to_remove) {
if fs::remove_file(path).is_ok() {
removed_count += 1;
}
}
Ok(removed_count)
}
}
pub struct CheckpointValidatorFactory;
impl CheckpointValidatorFactory {
pub fn create_default_validator(config: V2WALConfig) -> CheckpointResult<CheckpointValidator> {
Ok(CheckpointValidator::new(config))
}
pub fn create_validator_with_rules(
config: V2WALConfig,
rules: Vec<ValidationRule>,
) -> CheckpointResult<CheckpointValidator> {
let mut validator = CheckpointValidator::new(config);
for rule in rules {
validator.rule_engine.add_rule(rule);
}
Ok(validator)
}
pub fn create_metrics(config: V2WALConfig) -> CheckpointResult<CheckpointMetrics> {
Ok(CheckpointMetrics::new(config))
}
pub fn create_cleanup(config: V2WALConfig) -> CheckpointResult<CheckpointCleanup> {
Ok(CheckpointCleanup::new(config))
}
pub fn create_all_components(config: V2WALConfig) -> CheckpointResult<ValidationComponents> {
Ok(ValidationComponents {
validator: Self::create_default_validator(config.clone())?,
metrics: Self::create_metrics(config.clone())?,
cleanup: Self::create_cleanup(config)?,
})
}
}
pub struct ValidationComponents {
pub validator: CheckpointValidator,
pub metrics: CheckpointMetrics,
pub cleanup: CheckpointCleanup,
}
pub mod consistency;
pub mod invariants;
pub mod reporting;
pub mod rules;
use crate::backend::native::v2::wal::checkpoint::constants::performance;
use crate::backend::native::v2::wal::checkpoint::constants::*;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_checkpoint_validator_creation() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = CheckpointValidator::new(config);
assert!(true, "Checkpoint validator created successfully");
}
#[test]
fn test_checkpoint_metrics_creation() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let metrics = CheckpointMetrics::new(config);
assert!(true, "Checkpoint metrics created successfully");
}
#[test]
fn test_checkpoint_cleanup_creation() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let cleanup = CheckpointCleanup::new(config);
assert!(true, "Checkpoint cleanup created successfully");
}
#[test]
fn test_checkpoint_validator_factory() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let validator = CheckpointValidatorFactory::create_default_validator(config.clone());
assert!(validator.is_ok());
let metrics = CheckpointValidatorFactory::create_metrics(config.clone());
assert!(metrics.is_ok());
let cleanup = CheckpointValidatorFactory::create_cleanup(config.clone());
assert!(cleanup.is_ok());
let components = CheckpointValidatorFactory::create_all_components(config);
assert!(components.is_ok());
}
#[test]
fn test_anomaly_detector_creation() {
let detector = AnomalyDetector::default();
assert_eq!(detector.duration_anomalies, 0);
assert_eq!(detector.throughput_anomalies, 0);
assert_eq!(detector.block_count_anomalies, 0);
}
#[test]
fn test_checkpoint_metrics_data_default() {
let metrics = CheckpointMetricsData::default();
assert_eq!(metrics.total_checkpoints, 0);
assert_eq!(metrics.avg_checkpoint_duration_ms, 0);
assert_eq!(metrics.checkpoint_throughput_mbps, 0.0);
}
#[test]
fn test_validation_components_structure() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let components = CheckpointValidatorFactory::create_all_components(config).unwrap();
let _ = &components.validator;
let _ = &components.metrics;
let _ = &components.cleanup;
assert!(true, "All validation components created successfully");
}
#[test]
fn test_backward_compatibility() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
..Default::default()
};
let _validator: CheckpointValidator = CheckpointValidator::new(config.clone());
let _metrics: CheckpointMetrics = CheckpointMetrics::new(config.clone());
let _cleanup: CheckpointCleanup = CheckpointCleanup::new(config);
assert!(true, "Backward compatibility types are available");
}
}