ecad_processor/processors/
integrity_checker.rs1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
7pub struct IntegrityReport {
8 pub total_records: usize,
9 pub valid_records: usize,
10 pub suspect_records: usize,
11 pub invalid_records: usize,
12 pub missing_data_records: usize,
13 pub temperature_violations: Vec<TemperatureViolation>,
14 pub station_statistics: HashMap<u32, StationStatistics>,
15}
16
17#[derive(Debug, Clone)]
18pub struct TemperatureViolation {
19 pub station_id: u32,
20 pub date: chrono::NaiveDate,
21 pub violation_type: ViolationType,
22 pub details: String,
23}
24
25#[derive(Debug, Clone)]
26pub enum ViolationType {
27 MinGreaterThanAvg,
28 AvgGreaterThanMax,
29 OutOfRange,
30 SuspiciousJump,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct StationStatistics {
35 pub total_records: usize,
36 pub valid_records: usize,
37 pub suspect_records: usize,
38 pub missing_data_records: usize,
39 pub min_temp: Option<f32>,
40 pub max_temp: Option<f32>,
41 pub avg_temp: Option<f32>,
42}
43
44pub struct IntegrityChecker {
45 temperature_jump_threshold: f32,
46}
47
48impl IntegrityChecker {
49 pub fn new() -> Self {
50 Self {
51 temperature_jump_threshold: 20.0, }
53 }
54
55 pub fn with_strict_mode(_strict_mode: bool) -> Self {
56 Self {
57 temperature_jump_threshold: 20.0,
58 }
59 }
60
61 pub fn check_integrity(&self, records: &[ConsolidatedRecord]) -> Result<IntegrityReport> {
63 let mut report = IntegrityReport {
64 total_records: records.len(),
65 valid_records: 0,
66 suspect_records: 0,
67 invalid_records: 0,
68 missing_data_records: 0,
69 temperature_violations: Vec::new(),
70 station_statistics: HashMap::new(),
71 };
72
73 let mut station_records: HashMap<u32, Vec<&ConsolidatedRecord>> = HashMap::new();
75 for record in records {
76 station_records
77 .entry(record.station_id)
78 .or_default()
79 .push(record);
80 }
81
82 for records in station_records.values_mut() {
84 records.sort_by_key(|r| r.date);
85 }
86
87 for record in records {
89 self.check_record(record, &mut report)?;
90
91 let stats = report
93 .station_statistics
94 .entry(record.station_id)
95 .or_default();
96
97 stats.total_records += 1;
98
99 if record.has_valid_data() {
100 stats.valid_records += 1;
101 } else if record.has_suspect_data() {
102 stats.suspect_records += 1;
103 }
104
105 if record.has_missing_data() {
106 stats.missing_data_records += 1;
107 }
108
109 if record.min_temp != -9999.0 {
111 stats.min_temp = Some(
112 stats
113 .min_temp
114 .map_or(record.min_temp, |t| t.min(record.min_temp)),
115 );
116 stats.max_temp = Some(
117 stats
118 .max_temp
119 .map_or(record.min_temp, |t| t.max(record.min_temp)),
120 );
121 }
122 }
123
124 for (station_id, records) in station_records {
126 self.check_time_series_integrity(station_id, &records, &mut report)?;
127 }
128
129 Ok(report)
130 }
131
132 fn check_record(
134 &self,
135 record: &ConsolidatedRecord,
136 report: &mut IntegrityReport,
137 ) -> Result<()> {
138 record
140 .validate_relationships()
141 .inspect_err(|e| {
142 report.temperature_violations.push(TemperatureViolation {
143 station_id: record.station_id,
144 date: record.date,
145 violation_type: ViolationType::MinGreaterThanAvg,
146 details: e.to_string(),
147 });
148 })
149 .ok();
150
151 self.check_temperature_ranges(record, report)?;
153
154 if record.has_valid_data() {
156 report.valid_records += 1;
157 } else if record.has_suspect_data() {
158 report.suspect_records += 1;
159 } else {
160 report.invalid_records += 1;
161 }
162
163 if record.has_missing_data() {
164 report.missing_data_records += 1;
165 }
166
167 Ok(())
168 }
169
170 fn check_temperature_ranges(
172 &self,
173 record: &ConsolidatedRecord,
174 report: &mut IntegrityReport,
175 ) -> Result<()> {
176 let temps = [
177 (record.min_temp, "min"),
178 (record.max_temp, "max"),
179 (record.avg_temp, "avg"),
180 ];
181
182 for (temp, name) in temps {
183 if temp != -9999.0 && !(MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp) {
184 report.temperature_violations.push(TemperatureViolation {
185 station_id: record.station_id,
186 date: record.date,
187 violation_type: ViolationType::OutOfRange,
188 details: format!(
189 "{} temperature {} is outside valid range [{}, {}]",
190 name, temp, MIN_VALID_TEMP, MAX_VALID_TEMP
191 ),
192 });
193
194 }
203 }
204
205 Ok(())
206 }
207
208 fn check_time_series_integrity(
210 &self,
211 station_id: u32,
212 records: &[&ConsolidatedRecord],
213 report: &mut IntegrityReport,
214 ) -> Result<()> {
215 for window in records.windows(2) {
216 let prev = window[0];
217 let curr = window[1];
218
219 let temps = [
221 (prev.min_temp, curr.min_temp, "min"),
222 (prev.max_temp, curr.max_temp, "max"),
223 (prev.avg_temp, curr.avg_temp, "avg"),
224 ];
225
226 for (prev_temp, curr_temp, name) in temps {
227 if prev_temp != -9999.0 && curr_temp != -9999.0 {
228 let jump = (curr_temp - prev_temp).abs();
229
230 if jump > self.temperature_jump_threshold {
231 report.temperature_violations.push(TemperatureViolation {
232 station_id,
233 date: curr.date,
234 violation_type: ViolationType::SuspiciousJump,
235 details: format!(
236 "{} temperature jumped {:.1}°C from {} to {}",
237 name, jump, prev.date, curr.date
238 ),
239 });
240 }
241 }
242 }
243 }
244
245 Ok(())
246 }
247
248 pub fn generate_summary(&self, report: &IntegrityReport) -> String {
250 let mut summary = String::new();
251
252 summary.push_str("=== Integrity Check Report ===\n");
253 summary.push_str(&format!("Total Records: {}\n", report.total_records));
254 summary.push_str(&format!(
255 "Valid Records: {} ({:.1}%)\n",
256 report.valid_records,
257 100.0 * report.valid_records as f64 / report.total_records as f64
258 ));
259 summary.push_str(&format!(
260 "Suspect Records: {} ({:.1}%)\n",
261 report.suspect_records,
262 100.0 * report.suspect_records as f64 / report.total_records as f64
263 ));
264 summary.push_str(&format!(
265 "Invalid Records: {} ({:.1}%)\n",
266 report.invalid_records,
267 100.0 * report.invalid_records as f64 / report.total_records as f64
268 ));
269 summary.push_str(&format!(
270 "Missing Data Records: {}\n",
271 report.missing_data_records
272 ));
273 summary.push_str(&format!(
274 "\nTemperature Violations: {}\n",
275 report.temperature_violations.len()
276 ));
277
278 if !report.temperature_violations.is_empty() {
279 summary.push_str("\nTop 10 Violations:\n");
280 for (i, violation) in report.temperature_violations.iter().take(10).enumerate() {
281 summary.push_str(&format!(
282 " {}. Station {} on {}: {}\n",
283 i + 1,
284 violation.station_id,
285 violation.date,
286 violation.details
287 ));
288 }
289 }
290
291 summary
292 }
293}
294
295impl Default for IntegrityChecker {
296 fn default() -> Self {
297 Self::new()
298 }
299}