use crate::error::Result;
use crate::models::ConsolidatedRecord;
use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct IntegrityReport {
pub total_records: usize,
pub valid_records: usize,
pub suspect_records: usize,
pub invalid_records: usize,
pub missing_data_records: usize,
pub temperature_violations: Vec<TemperatureViolation>,
pub station_statistics: HashMap<u32, StationStatistics>,
}
#[derive(Debug, Clone)]
pub struct TemperatureViolation {
pub station_id: u32,
pub date: chrono::NaiveDate,
pub violation_type: ViolationType,
pub details: String,
}
#[derive(Debug, Clone)]
pub enum ViolationType {
MinGreaterThanAvg,
AvgGreaterThanMax,
OutOfRange,
SuspiciousJump,
}
#[derive(Debug, Clone, Default)]
pub struct StationStatistics {
pub total_records: usize,
pub valid_records: usize,
pub suspect_records: usize,
pub missing_data_records: usize,
pub min_temp: Option<f32>,
pub max_temp: Option<f32>,
pub avg_temp: Option<f32>,
}
pub struct IntegrityChecker {
temperature_jump_threshold: f32,
}
impl IntegrityChecker {
pub fn new() -> Self {
Self {
temperature_jump_threshold: 20.0, }
}
pub fn with_strict_mode(_strict_mode: bool) -> Self {
Self {
temperature_jump_threshold: 20.0,
}
}
pub fn check_integrity(&self, records: &[ConsolidatedRecord]) -> Result<IntegrityReport> {
let mut report = IntegrityReport {
total_records: records.len(),
valid_records: 0,
suspect_records: 0,
invalid_records: 0,
missing_data_records: 0,
temperature_violations: Vec::new(),
station_statistics: HashMap::new(),
};
let mut station_records: HashMap<u32, Vec<&ConsolidatedRecord>> = HashMap::new();
for record in records {
station_records
.entry(record.station_id)
.or_default()
.push(record);
}
for records in station_records.values_mut() {
records.sort_by_key(|r| r.date);
}
for record in records {
self.check_record(record, &mut report)?;
let stats = report
.station_statistics
.entry(record.station_id)
.or_default();
stats.total_records += 1;
if record.has_valid_data() {
stats.valid_records += 1;
} else if record.has_suspect_data() {
stats.suspect_records += 1;
}
if record.has_missing_data() {
stats.missing_data_records += 1;
}
if record.min_temp != -9999.0 {
stats.min_temp = Some(
stats
.min_temp
.map_or(record.min_temp, |t| t.min(record.min_temp)),
);
stats.max_temp = Some(
stats
.max_temp
.map_or(record.min_temp, |t| t.max(record.min_temp)),
);
}
}
for (station_id, records) in station_records {
self.check_time_series_integrity(station_id, &records, &mut report)?;
}
Ok(report)
}
fn check_record(
&self,
record: &ConsolidatedRecord,
report: &mut IntegrityReport,
) -> Result<()> {
let validation_result = record.validate_relationships();
if let Err(e) = validation_result {
report.temperature_violations.push(TemperatureViolation {
station_id: record.station_id,
date: record.date,
violation_type: ViolationType::MinGreaterThanAvg,
details: e.to_string(),
});
}
self.check_temperature_ranges(record, report)?;
if record.has_valid_data() {
report.valid_records += 1;
} else if record.has_suspect_data() {
report.suspect_records += 1;
} else {
report.invalid_records += 1;
}
if record.has_missing_data() {
report.missing_data_records += 1;
}
Ok(())
}
fn check_temperature_ranges(
&self,
record: &ConsolidatedRecord,
report: &mut IntegrityReport,
) -> Result<()> {
let temps = [
(record.min_temp, "min"),
(record.max_temp, "max"),
(record.avg_temp, "avg"),
];
for (temp, name) in temps {
if temp != -9999.0 && !(MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp) {
report.temperature_violations.push(TemperatureViolation {
station_id: record.station_id,
date: record.date,
violation_type: ViolationType::OutOfRange,
details: format!(
"{} temperature {} is outside valid range [{}, {}]",
name, temp, MIN_VALID_TEMP, MAX_VALID_TEMP
),
});
}
}
Ok(())
}
fn check_time_series_integrity(
&self,
station_id: u32,
records: &[&ConsolidatedRecord],
report: &mut IntegrityReport,
) -> Result<()> {
for window in records.windows(2) {
let prev = window[0];
let curr = window[1];
let temps = [
(prev.min_temp, curr.min_temp, "min"),
(prev.max_temp, curr.max_temp, "max"),
(prev.avg_temp, curr.avg_temp, "avg"),
];
for (prev_temp, curr_temp, name) in temps {
if prev_temp != -9999.0 && curr_temp != -9999.0 {
let jump = (curr_temp - prev_temp).abs();
if jump > self.temperature_jump_threshold {
report.temperature_violations.push(TemperatureViolation {
station_id,
date: curr.date,
violation_type: ViolationType::SuspiciousJump,
details: format!(
"{} temperature jumped {:.1}°C from {} to {}",
name, jump, prev.date, curr.date
),
});
}
}
}
}
Ok(())
}
pub fn generate_summary(&self, report: &IntegrityReport) -> String {
let mut summary = String::new();
summary.push_str("=== Integrity Check Report ===\n");
summary.push_str(&format!("Total Records: {}\n", report.total_records));
summary.push_str(&format!(
"Valid Records: {} ({:.1}%)\n",
report.valid_records,
100.0 * report.valid_records as f64 / report.total_records as f64
));
summary.push_str(&format!(
"Suspect Records: {} ({:.1}%)\n",
report.suspect_records,
100.0 * report.suspect_records as f64 / report.total_records as f64
));
summary.push_str(&format!(
"Invalid Records: {} ({:.1}%)\n",
report.invalid_records,
100.0 * report.invalid_records as f64 / report.total_records as f64
));
summary.push_str(&format!(
"Missing Data Records: {}\n",
report.missing_data_records
));
summary.push_str(&format!(
"\nTemperature Violations: {}\n",
report.temperature_violations.len()
));
if !report.temperature_violations.is_empty() {
summary.push_str("\nTop 10 Violations:\n");
for (i, violation) in report.temperature_violations.iter().take(10).enumerate() {
summary.push_str(&format!(
" {}. Station {} on {}: {}\n",
i + 1,
violation.station_id,
violation.date,
violation.details
));
}
}
summary
}
}
impl Default for IntegrityChecker {
fn default() -> Self {
Self::new()
}
}