ecad_processor/processors/
integrity_checker.rs

1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
7pub struct IntegrityReport {
8    pub total_records: usize,
9    pub valid_records: usize,
10    pub suspect_records: usize,
11    pub invalid_records: usize,
12    pub missing_data_records: usize,
13    pub temperature_violations: Vec<TemperatureViolation>,
14    pub station_statistics: HashMap<u32, StationStatistics>,
15}
16
17#[derive(Debug, Clone)]
18pub struct TemperatureViolation {
19    pub station_id: u32,
20    pub date: chrono::NaiveDate,
21    pub violation_type: ViolationType,
22    pub details: String,
23}
24
25#[derive(Debug, Clone)]
26pub enum ViolationType {
27    MinGreaterThanAvg,
28    AvgGreaterThanMax,
29    OutOfRange,
30    SuspiciousJump,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct StationStatistics {
35    pub total_records: usize,
36    pub valid_records: usize,
37    pub suspect_records: usize,
38    pub missing_data_records: usize,
39    pub min_temp: Option<f32>,
40    pub max_temp: Option<f32>,
41    pub avg_temp: Option<f32>,
42}
43
44pub struct IntegrityChecker {
45    temperature_jump_threshold: f32,
46}
47
48impl IntegrityChecker {
49    pub fn new() -> Self {
50        Self {
51            temperature_jump_threshold: 20.0, // 20°C jump between consecutive days
52        }
53    }
54
55    pub fn with_strict_mode(_strict_mode: bool) -> Self {
56        Self {
57            temperature_jump_threshold: 20.0,
58        }
59    }
60
61    /// Check integrity of consolidated records
62    pub fn check_integrity(&self, records: &[ConsolidatedRecord]) -> Result<IntegrityReport> {
63        let mut report = IntegrityReport {
64            total_records: records.len(),
65            valid_records: 0,
66            suspect_records: 0,
67            invalid_records: 0,
68            missing_data_records: 0,
69            temperature_violations: Vec::new(),
70            station_statistics: HashMap::new(),
71        };
72
73        // Group records by station for time series checks
74        let mut station_records: HashMap<u32, Vec<&ConsolidatedRecord>> = HashMap::new();
75        for record in records {
76            station_records
77                .entry(record.station_id)
78                .or_default()
79                .push(record);
80        }
81
82        // Sort each station's records by date
83        for records in station_records.values_mut() {
84            records.sort_by_key(|r| r.date);
85        }
86
87        // Check each record
88        for record in records {
89            self.check_record(record, &mut report)?;
90
91            // Update station statistics
92            let stats = report
93                .station_statistics
94                .entry(record.station_id)
95                .or_default();
96
97            stats.total_records += 1;
98
99            if record.has_valid_data() {
100                stats.valid_records += 1;
101            } else if record.has_suspect_data() {
102                stats.suspect_records += 1;
103            }
104
105            if record.has_missing_data() {
106                stats.missing_data_records += 1;
107            }
108
109            // Update temperature ranges
110            if record.min_temp != -9999.0 {
111                stats.min_temp = Some(
112                    stats
113                        .min_temp
114                        .map_or(record.min_temp, |t| t.min(record.min_temp)),
115                );
116                stats.max_temp = Some(
117                    stats
118                        .max_temp
119                        .map_or(record.min_temp, |t| t.max(record.min_temp)),
120                );
121            }
122        }
123
124        // Check time series integrity
125        for (station_id, records) in station_records {
126            self.check_time_series_integrity(station_id, &records, &mut report)?;
127        }
128
129        Ok(report)
130    }
131
132    /// Check individual record integrity
133    fn check_record(
134        &self,
135        record: &ConsolidatedRecord,
136        report: &mut IntegrityReport,
137    ) -> Result<()> {
138        // Validate basic constraints
139        let validation_result = record.validate_relationships();
140
141        if let Err(e) = validation_result {
142            report.temperature_violations.push(TemperatureViolation {
143                station_id: record.station_id,
144                date: record.date,
145                violation_type: ViolationType::MinGreaterThanAvg,
146                details: e.to_string(),
147            });
148        }
149
150        // Check temperature ranges
151        self.check_temperature_ranges(record, report)?;
152
153        // Count record types
154        if record.has_valid_data() {
155            report.valid_records += 1;
156        } else if record.has_suspect_data() {
157            report.suspect_records += 1;
158        } else {
159            report.invalid_records += 1;
160        }
161
162        if record.has_missing_data() {
163            report.missing_data_records += 1;
164        }
165
166        Ok(())
167    }
168
169    /// Check if temperatures are within valid ranges
170    fn check_temperature_ranges(
171        &self,
172        record: &ConsolidatedRecord,
173        report: &mut IntegrityReport,
174    ) -> Result<()> {
175        let temps = [
176            (record.min_temp, "min"),
177            (record.max_temp, "max"),
178            (record.avg_temp, "avg"),
179        ];
180
181        for (temp, name) in temps {
182            if temp != -9999.0 && !(MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp) {
183                report.temperature_violations.push(TemperatureViolation {
184                    station_id: record.station_id,
185                    date: record.date,
186                    violation_type: ViolationType::OutOfRange,
187                    details: format!(
188                        "{} temperature {} is outside valid range [{}, {}]",
189                        name, temp, MIN_VALID_TEMP, MAX_VALID_TEMP
190                    ),
191                });
192
193                // Report but don't fail on temperature range violations
194                // Real-world data often has sensor errors that should be reported but not stop processing
195                // if self.strict_mode {
196                //     return Err(ProcessingError::TemperatureValidation {
197                //         message: format!("Temperature {} out of range for station {} on {}",
198                //             temp, record.station_id, record.date),
199                //     });
200                // }
201            }
202        }
203
204        Ok(())
205    }
206
207    /// Check time series integrity for temperature jumps
208    fn check_time_series_integrity(
209        &self,
210        station_id: u32,
211        records: &[&ConsolidatedRecord],
212        report: &mut IntegrityReport,
213    ) -> Result<()> {
214        for window in records.windows(2) {
215            let prev = window[0];
216            let curr = window[1];
217
218            // Check for suspicious temperature jumps
219            let temps = [
220                (prev.min_temp, curr.min_temp, "min"),
221                (prev.max_temp, curr.max_temp, "max"),
222                (prev.avg_temp, curr.avg_temp, "avg"),
223            ];
224
225            for (prev_temp, curr_temp, name) in temps {
226                if prev_temp != -9999.0 && curr_temp != -9999.0 {
227                    let jump = (curr_temp - prev_temp).abs();
228
229                    if jump > self.temperature_jump_threshold {
230                        report.temperature_violations.push(TemperatureViolation {
231                            station_id,
232                            date: curr.date,
233                            violation_type: ViolationType::SuspiciousJump,
234                            details: format!(
235                                "{} temperature jumped {:.1}°C from {} to {}",
236                                name, jump, prev.date, curr.date
237                            ),
238                        });
239                    }
240                }
241            }
242        }
243
244        Ok(())
245    }
246
247    /// Generate a summary report
248    pub fn generate_summary(&self, report: &IntegrityReport) -> String {
249        let mut summary = String::new();
250
251        summary.push_str("=== Integrity Check Report ===\n");
252        summary.push_str(&format!("Total Records: {}\n", report.total_records));
253        summary.push_str(&format!(
254            "Valid Records: {} ({:.1}%)\n",
255            report.valid_records,
256            100.0 * report.valid_records as f64 / report.total_records as f64
257        ));
258        summary.push_str(&format!(
259            "Suspect Records: {} ({:.1}%)\n",
260            report.suspect_records,
261            100.0 * report.suspect_records as f64 / report.total_records as f64
262        ));
263        summary.push_str(&format!(
264            "Invalid Records: {} ({:.1}%)\n",
265            report.invalid_records,
266            100.0 * report.invalid_records as f64 / report.total_records as f64
267        ));
268        summary.push_str(&format!(
269            "Missing Data Records: {}\n",
270            report.missing_data_records
271        ));
272        summary.push_str(&format!(
273            "\nTemperature Violations: {}\n",
274            report.temperature_violations.len()
275        ));
276
277        if !report.temperature_violations.is_empty() {
278            summary.push_str("\nTop 10 Violations:\n");
279            for (i, violation) in report.temperature_violations.iter().take(10).enumerate() {
280                summary.push_str(&format!(
281                    "  {}. Station {} on {}: {}\n",
282                    i + 1,
283                    violation.station_id,
284                    violation.date,
285                    violation.details
286                ));
287            }
288        }
289
290        summary
291    }
292}
293
294impl Default for IntegrityChecker {
295    fn default() -> Self {
296        Self::new()
297    }
298}