ecad_processor/processors/
integrity_checker.rs1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
7pub struct IntegrityReport {
8 pub total_records: usize,
9 pub valid_records: usize,
10 pub suspect_records: usize,
11 pub invalid_records: usize,
12 pub missing_data_records: usize,
13 pub temperature_violations: Vec<TemperatureViolation>,
14 pub station_statistics: HashMap<u32, StationStatistics>,
15}
16
17#[derive(Debug, Clone)]
18pub struct TemperatureViolation {
19 pub station_id: u32,
20 pub date: chrono::NaiveDate,
21 pub violation_type: ViolationType,
22 pub details: String,
23}
24
25#[derive(Debug, Clone)]
26pub enum ViolationType {
27 MinGreaterThanAvg,
28 AvgGreaterThanMax,
29 OutOfRange,
30 SuspiciousJump,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct StationStatistics {
35 pub total_records: usize,
36 pub valid_records: usize,
37 pub suspect_records: usize,
38 pub missing_data_records: usize,
39 pub min_temp: Option<f32>,
40 pub max_temp: Option<f32>,
41 pub avg_temp: Option<f32>,
42}
43
44pub struct IntegrityChecker {
45 temperature_jump_threshold: f32,
46}
47
48impl IntegrityChecker {
49 pub fn new() -> Self {
50 Self {
51 temperature_jump_threshold: 20.0, }
53 }
54
55 pub fn with_strict_mode(_strict_mode: bool) -> Self {
56 Self {
57 temperature_jump_threshold: 20.0,
58 }
59 }
60
61 pub fn check_integrity(&self, records: &[ConsolidatedRecord]) -> Result<IntegrityReport> {
63 let mut report = IntegrityReport {
64 total_records: records.len(),
65 valid_records: 0,
66 suspect_records: 0,
67 invalid_records: 0,
68 missing_data_records: 0,
69 temperature_violations: Vec::new(),
70 station_statistics: HashMap::new(),
71 };
72
73 let mut station_records: HashMap<u32, Vec<&ConsolidatedRecord>> = HashMap::new();
75 for record in records {
76 station_records
77 .entry(record.station_id)
78 .or_default()
79 .push(record);
80 }
81
82 for records in station_records.values_mut() {
84 records.sort_by_key(|r| r.date);
85 }
86
87 for record in records {
89 self.check_record(record, &mut report)?;
90
91 let stats = report
93 .station_statistics
94 .entry(record.station_id)
95 .or_default();
96
97 stats.total_records += 1;
98
99 if record.has_valid_data() {
100 stats.valid_records += 1;
101 } else if record.has_suspect_data() {
102 stats.suspect_records += 1;
103 }
104
105 if record.has_missing_data() {
106 stats.missing_data_records += 1;
107 }
108
109 if record.min_temp != -9999.0 {
111 stats.min_temp = Some(
112 stats
113 .min_temp
114 .map_or(record.min_temp, |t| t.min(record.min_temp)),
115 );
116 stats.max_temp = Some(
117 stats
118 .max_temp
119 .map_or(record.min_temp, |t| t.max(record.min_temp)),
120 );
121 }
122 }
123
124 for (station_id, records) in station_records {
126 self.check_time_series_integrity(station_id, &records, &mut report)?;
127 }
128
129 Ok(report)
130 }
131
132 fn check_record(
134 &self,
135 record: &ConsolidatedRecord,
136 report: &mut IntegrityReport,
137 ) -> Result<()> {
138 let validation_result = record.validate_relationships();
140
141 if let Err(e) = validation_result {
142 report.temperature_violations.push(TemperatureViolation {
143 station_id: record.station_id,
144 date: record.date,
145 violation_type: ViolationType::MinGreaterThanAvg,
146 details: e.to_string(),
147 });
148 }
149
150 self.check_temperature_ranges(record, report)?;
152
153 if record.has_valid_data() {
155 report.valid_records += 1;
156 } else if record.has_suspect_data() {
157 report.suspect_records += 1;
158 } else {
159 report.invalid_records += 1;
160 }
161
162 if record.has_missing_data() {
163 report.missing_data_records += 1;
164 }
165
166 Ok(())
167 }
168
169 fn check_temperature_ranges(
171 &self,
172 record: &ConsolidatedRecord,
173 report: &mut IntegrityReport,
174 ) -> Result<()> {
175 let temps = [
176 (record.min_temp, "min"),
177 (record.max_temp, "max"),
178 (record.avg_temp, "avg"),
179 ];
180
181 for (temp, name) in temps {
182 if temp != -9999.0 && !(MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp) {
183 report.temperature_violations.push(TemperatureViolation {
184 station_id: record.station_id,
185 date: record.date,
186 violation_type: ViolationType::OutOfRange,
187 details: format!(
188 "{} temperature {} is outside valid range [{}, {}]",
189 name, temp, MIN_VALID_TEMP, MAX_VALID_TEMP
190 ),
191 });
192
193 }
202 }
203
204 Ok(())
205 }
206
207 fn check_time_series_integrity(
209 &self,
210 station_id: u32,
211 records: &[&ConsolidatedRecord],
212 report: &mut IntegrityReport,
213 ) -> Result<()> {
214 for window in records.windows(2) {
215 let prev = window[0];
216 let curr = window[1];
217
218 let temps = [
220 (prev.min_temp, curr.min_temp, "min"),
221 (prev.max_temp, curr.max_temp, "max"),
222 (prev.avg_temp, curr.avg_temp, "avg"),
223 ];
224
225 for (prev_temp, curr_temp, name) in temps {
226 if prev_temp != -9999.0 && curr_temp != -9999.0 {
227 let jump = (curr_temp - prev_temp).abs();
228
229 if jump > self.temperature_jump_threshold {
230 report.temperature_violations.push(TemperatureViolation {
231 station_id,
232 date: curr.date,
233 violation_type: ViolationType::SuspiciousJump,
234 details: format!(
235 "{} temperature jumped {:.1}°C from {} to {}",
236 name, jump, prev.date, curr.date
237 ),
238 });
239 }
240 }
241 }
242 }
243
244 Ok(())
245 }
246
247 pub fn generate_summary(&self, report: &IntegrityReport) -> String {
249 let mut summary = String::new();
250
251 summary.push_str("=== Integrity Check Report ===\n");
252 summary.push_str(&format!("Total Records: {}\n", report.total_records));
253 summary.push_str(&format!(
254 "Valid Records: {} ({:.1}%)\n",
255 report.valid_records,
256 100.0 * report.valid_records as f64 / report.total_records as f64
257 ));
258 summary.push_str(&format!(
259 "Suspect Records: {} ({:.1}%)\n",
260 report.suspect_records,
261 100.0 * report.suspect_records as f64 / report.total_records as f64
262 ));
263 summary.push_str(&format!(
264 "Invalid Records: {} ({:.1}%)\n",
265 report.invalid_records,
266 100.0 * report.invalid_records as f64 / report.total_records as f64
267 ));
268 summary.push_str(&format!(
269 "Missing Data Records: {}\n",
270 report.missing_data_records
271 ));
272 summary.push_str(&format!(
273 "\nTemperature Violations: {}\n",
274 report.temperature_violations.len()
275 ));
276
277 if !report.temperature_violations.is_empty() {
278 summary.push_str("\nTop 10 Violations:\n");
279 for (i, violation) in report.temperature_violations.iter().take(10).enumerate() {
280 summary.push_str(&format!(
281 " {}. Station {} on {}: {}\n",
282 i + 1,
283 violation.station_id,
284 violation.date,
285 violation.details
286 ));
287 }
288 }
289
290 summary
291 }
292}
293
294impl Default for IntegrityChecker {
295 fn default() -> Self {
296 Self::new()
297 }
298}