ecad_processor/processors/
integrity_checker.rs

1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
7pub struct IntegrityReport {
8    pub total_records: usize,
9    pub valid_records: usize,
10    pub suspect_records: usize,
11    pub invalid_records: usize,
12    pub missing_data_records: usize,
13    pub temperature_violations: Vec<TemperatureViolation>,
14    pub station_statistics: HashMap<u32, StationStatistics>,
15}
16
17#[derive(Debug, Clone)]
18pub struct TemperatureViolation {
19    pub station_id: u32,
20    pub date: chrono::NaiveDate,
21    pub violation_type: ViolationType,
22    pub details: String,
23}
24
25#[derive(Debug, Clone)]
26pub enum ViolationType {
27    MinGreaterThanAvg,
28    AvgGreaterThanMax,
29    OutOfRange,
30    SuspiciousJump,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct StationStatistics {
35    pub total_records: usize,
36    pub valid_records: usize,
37    pub suspect_records: usize,
38    pub missing_data_records: usize,
39    pub min_temp: Option<f32>,
40    pub max_temp: Option<f32>,
41    pub avg_temp: Option<f32>,
42}
43
44pub struct IntegrityChecker {
45    temperature_jump_threshold: f32,
46}
47
48impl IntegrityChecker {
49    pub fn new() -> Self {
50        Self {
51            temperature_jump_threshold: 20.0, // 20°C jump between consecutive days
52        }
53    }
54
55    pub fn with_strict_mode(_strict_mode: bool) -> Self {
56        Self {
57            temperature_jump_threshold: 20.0,
58        }
59    }
60
61    /// Check integrity of consolidated records
62    pub fn check_integrity(&self, records: &[ConsolidatedRecord]) -> Result<IntegrityReport> {
63        let mut report = IntegrityReport {
64            total_records: records.len(),
65            valid_records: 0,
66            suspect_records: 0,
67            invalid_records: 0,
68            missing_data_records: 0,
69            temperature_violations: Vec::new(),
70            station_statistics: HashMap::new(),
71        };
72
73        // Group records by station for time series checks
74        let mut station_records: HashMap<u32, Vec<&ConsolidatedRecord>> = HashMap::new();
75        for record in records {
76            station_records
77                .entry(record.station_id)
78                .or_default()
79                .push(record);
80        }
81
82        // Sort each station's records by date
83        for records in station_records.values_mut() {
84            records.sort_by_key(|r| r.date);
85        }
86
87        // Check each record
88        for record in records {
89            self.check_record(record, &mut report)?;
90
91            // Update station statistics
92            let stats = report
93                .station_statistics
94                .entry(record.station_id)
95                .or_default();
96
97            stats.total_records += 1;
98
99            if record.has_valid_data() {
100                stats.valid_records += 1;
101            } else if record.has_suspect_data() {
102                stats.suspect_records += 1;
103            }
104
105            if record.has_missing_data() {
106                stats.missing_data_records += 1;
107            }
108
109            // Update temperature ranges
110            if record.min_temp != -9999.0 {
111                stats.min_temp = Some(
112                    stats
113                        .min_temp
114                        .map_or(record.min_temp, |t| t.min(record.min_temp)),
115                );
116                stats.max_temp = Some(
117                    stats
118                        .max_temp
119                        .map_or(record.min_temp, |t| t.max(record.min_temp)),
120                );
121            }
122        }
123
124        // Check time series integrity
125        for (station_id, records) in station_records {
126            self.check_time_series_integrity(station_id, &records, &mut report)?;
127        }
128
129        Ok(report)
130    }
131
132    /// Check individual record integrity
133    fn check_record(
134        &self,
135        record: &ConsolidatedRecord,
136        report: &mut IntegrityReport,
137    ) -> Result<()> {
138        // Validate basic constraints
139        record
140            .validate_relationships()
141            .inspect_err(|e| {
142                report.temperature_violations.push(TemperatureViolation {
143                    station_id: record.station_id,
144                    date: record.date,
145                    violation_type: ViolationType::MinGreaterThanAvg,
146                    details: e.to_string(),
147                });
148            })
149            .ok();
150
151        // Check temperature ranges
152        self.check_temperature_ranges(record, report)?;
153
154        // Count record types
155        if record.has_valid_data() {
156            report.valid_records += 1;
157        } else if record.has_suspect_data() {
158            report.suspect_records += 1;
159        } else {
160            report.invalid_records += 1;
161        }
162
163        if record.has_missing_data() {
164            report.missing_data_records += 1;
165        }
166
167        Ok(())
168    }
169
170    /// Check if temperatures are within valid ranges
171    fn check_temperature_ranges(
172        &self,
173        record: &ConsolidatedRecord,
174        report: &mut IntegrityReport,
175    ) -> Result<()> {
176        let temps = [
177            (record.min_temp, "min"),
178            (record.max_temp, "max"),
179            (record.avg_temp, "avg"),
180        ];
181
182        for (temp, name) in temps {
183            if temp != -9999.0 && !(MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp) {
184                report.temperature_violations.push(TemperatureViolation {
185                    station_id: record.station_id,
186                    date: record.date,
187                    violation_type: ViolationType::OutOfRange,
188                    details: format!(
189                        "{} temperature {} is outside valid range [{}, {}]",
190                        name, temp, MIN_VALID_TEMP, MAX_VALID_TEMP
191                    ),
192                });
193
194                // Report but don't fail on temperature range violations
195                // Real-world data often has sensor errors that should be reported but not stop processing
196                // if self.strict_mode {
197                //     return Err(ProcessingError::TemperatureValidation {
198                //         message: format!("Temperature {} out of range for station {} on {}",
199                //             temp, record.station_id, record.date),
200                //     });
201                // }
202            }
203        }
204
205        Ok(())
206    }
207
208    /// Check time series integrity for temperature jumps
209    fn check_time_series_integrity(
210        &self,
211        station_id: u32,
212        records: &[&ConsolidatedRecord],
213        report: &mut IntegrityReport,
214    ) -> Result<()> {
215        for window in records.windows(2) {
216            let prev = window[0];
217            let curr = window[1];
218
219            // Check for suspicious temperature jumps
220            let temps = [
221                (prev.min_temp, curr.min_temp, "min"),
222                (prev.max_temp, curr.max_temp, "max"),
223                (prev.avg_temp, curr.avg_temp, "avg"),
224            ];
225
226            for (prev_temp, curr_temp, name) in temps {
227                if prev_temp != -9999.0 && curr_temp != -9999.0 {
228                    let jump = (curr_temp - prev_temp).abs();
229
230                    if jump > self.temperature_jump_threshold {
231                        report.temperature_violations.push(TemperatureViolation {
232                            station_id,
233                            date: curr.date,
234                            violation_type: ViolationType::SuspiciousJump,
235                            details: format!(
236                                "{} temperature jumped {:.1}°C from {} to {}",
237                                name, jump, prev.date, curr.date
238                            ),
239                        });
240                    }
241                }
242            }
243        }
244
245        Ok(())
246    }
247
248    /// Generate a summary report
249    pub fn generate_summary(&self, report: &IntegrityReport) -> String {
250        let mut summary = String::new();
251
252        summary.push_str("=== Integrity Check Report ===\n");
253        summary.push_str(&format!("Total Records: {}\n", report.total_records));
254        summary.push_str(&format!(
255            "Valid Records: {} ({:.1}%)\n",
256            report.valid_records,
257            100.0 * report.valid_records as f64 / report.total_records as f64
258        ));
259        summary.push_str(&format!(
260            "Suspect Records: {} ({:.1}%)\n",
261            report.suspect_records,
262            100.0 * report.suspect_records as f64 / report.total_records as f64
263        ));
264        summary.push_str(&format!(
265            "Invalid Records: {} ({:.1}%)\n",
266            report.invalid_records,
267            100.0 * report.invalid_records as f64 / report.total_records as f64
268        ));
269        summary.push_str(&format!(
270            "Missing Data Records: {}\n",
271            report.missing_data_records
272        ));
273        summary.push_str(&format!(
274            "\nTemperature Violations: {}\n",
275            report.temperature_violations.len()
276        ));
277
278        if !report.temperature_violations.is_empty() {
279            summary.push_str("\nTop 10 Violations:\n");
280            for (i, violation) in report.temperature_violations.iter().take(10).enumerate() {
281                summary.push_str(&format!(
282                    "  {}. Station {} on {}: {}\n",
283                    i + 1,
284                    violation.station_id,
285                    violation.date,
286                    violation.details
287                ));
288            }
289        }
290
291        summary
292    }
293}
294
295impl Default for IntegrityChecker {
296    fn default() -> Self {
297        Self::new()
298    }
299}