ecad_processor/analyzers/
weather_analyzer.rs1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use chrono::NaiveDate;
5use std::collections::HashSet;
6use std::path::Path;
7
8fn is_valid_temperature(temp: f32) -> bool {
10 temp != -9999.0 && (MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp)
11}
12
13#[derive(Debug)]
14pub struct WeatherStatistics {
15 pub total_records: usize,
16 pub unique_stations: usize,
17 pub date_range: (NaiveDate, NaiveDate),
18 pub temperature_stats: TemperatureStats,
19 pub data_quality: DataQuality,
20 pub geographic_bounds: GeographicBounds,
21}
22
23#[derive(Debug)]
24pub struct TemperatureStats {
25 pub min_temp: f32,
26 pub max_temp: f32,
27 pub avg_temp: f32,
28 pub min_temp_location: String,
29 pub max_temp_location: String,
30}
31
32#[derive(Debug)]
33pub struct DataQuality {
34 pub total_records: usize,
35 pub valid_records: usize,
36 pub suspect_records: usize,
37 pub missing_records: usize,
38 pub complete_records: usize,
39}
40
41impl DataQuality {
42 pub fn valid_percentage(&self) -> f64 {
43 (self.valid_records as f64 / self.total_records as f64) * 100.0
44 }
45
46 pub fn suspect_percentage(&self) -> f64 {
47 (self.suspect_records as f64 / self.total_records as f64) * 100.0
48 }
49
50 pub fn missing_percentage(&self) -> f64 {
51 (self.missing_records as f64 / self.total_records as f64) * 100.0
52 }
53}
54
55#[derive(Debug)]
56pub struct GeographicBounds {
57 pub min_lat: f64,
58 pub max_lat: f64,
59 pub min_lon: f64,
60 pub max_lon: f64,
61}
62
63pub struct WeatherAnalyzer;
64
65impl WeatherAnalyzer {
66 pub fn new() -> Self {
67 Self
68 }
69
70 pub fn analyze_parquet(&self, path: &Path) -> Result<WeatherStatistics> {
71 self.analyze_parquet_with_limit(path, 0) }
73
74 pub fn analyze_parquet_with_limit(
75 &self,
76 path: &Path,
77 limit: usize,
78 ) -> Result<WeatherStatistics> {
79 let writer = crate::writers::ParquetWriter::new();
80
81 let file_info = writer.get_file_info(path)?;
83 let total_rows = file_info.total_rows as usize;
84
85 let records_to_read = if limit == 0 {
87 total_rows } else {
89 limit.min(total_rows) };
91
92 let records = writer.read_sample_records(path, records_to_read)?;
93
94 if records.is_empty() {
95 return Err(crate::error::ProcessingError::Config(
96 "No records found in Parquet file".to_string(),
97 ));
98 }
99
100 self.calculate_statistics(&records)
101 }
102
103 fn calculate_statistics(&self, records: &[ConsolidatedRecord]) -> Result<WeatherStatistics> {
104 if records.is_empty() {
105 return Err(crate::error::ProcessingError::Config(
106 "No records to analyze".to_string(),
107 ));
108 }
109
110 let mut unique_stations = HashSet::new();
111 let mut min_date = records[0].date;
112 let mut max_date = records[0].date;
113 let mut min_temp = f32::INFINITY;
114 let mut max_temp = f32::NEG_INFINITY;
115 let mut temp_sum = 0.0f64;
116 let mut temp_count = 0;
117 let mut min_temp_location = String::new();
118 let mut max_temp_location = String::new();
119
120 let mut valid_count = 0;
121 let mut suspect_count = 0;
122 let mut missing_count = 0;
123 let mut complete_count = 0;
124
125 let mut min_lat = records[0].latitude;
126 let mut max_lat = records[0].latitude;
127 let mut min_lon = records[0].longitude;
128 let mut max_lon = records[0].longitude;
129
130 for record in records {
131 unique_stations.insert(record.station_id);
132
133 if record.date < min_date {
134 min_date = record.date;
135 }
136 if record.date > max_date {
137 max_date = record.date;
138 }
139
140 if is_valid_temperature(record.min_temp) && record.min_temp < min_temp {
142 min_temp = record.min_temp;
143 min_temp_location = format!("{} ({})", record.station_name, record.date);
144 }
145
146 if is_valid_temperature(record.max_temp) && record.max_temp > max_temp {
147 max_temp = record.max_temp;
148 max_temp_location = format!("{} ({})", record.station_name, record.date);
149 }
150
151 if is_valid_temperature(record.avg_temp) {
153 temp_sum += record.avg_temp as f64;
154 temp_count += 1;
155 }
156
157 if record.has_valid_data() {
158 valid_count += 1;
159 }
160 if record.has_suspect_data() {
161 suspect_count += 1;
162 }
163 if record.has_missing_data() {
164 missing_count += 1;
165 }
166 if record.is_complete() {
167 complete_count += 1;
168 }
169
170 if record.latitude < min_lat {
171 min_lat = record.latitude;
172 }
173 if record.latitude > max_lat {
174 max_lat = record.latitude;
175 }
176 if record.longitude < min_lon {
177 min_lon = record.longitude;
178 }
179 if record.longitude > max_lon {
180 max_lon = record.longitude;
181 }
182 }
183
184 let avg_temp = if temp_count > 0 {
186 (temp_sum / temp_count as f64) as f32
187 } else {
188 f32::NAN
189 };
190
191 if min_temp == f32::INFINITY {
193 min_temp = f32::NAN;
194 min_temp_location = "No valid measurements".to_string();
195 }
196 if max_temp == f32::NEG_INFINITY {
197 max_temp = f32::NAN;
198 max_temp_location = "No valid measurements".to_string();
199 }
200
201 Ok(WeatherStatistics {
202 total_records: records.len(),
203 unique_stations: unique_stations.len(),
204 date_range: (min_date, max_date),
205 temperature_stats: TemperatureStats {
206 min_temp,
207 max_temp,
208 avg_temp,
209 min_temp_location,
210 max_temp_location,
211 },
212 data_quality: DataQuality {
213 total_records: records.len(),
214 valid_records: valid_count,
215 suspect_records: suspect_count,
216 missing_records: missing_count,
217 complete_records: complete_count,
218 },
219 geographic_bounds: GeographicBounds {
220 min_lat,
221 max_lat,
222 min_lon,
223 max_lon,
224 },
225 })
226 }
227}
228
229impl WeatherStatistics {
230 pub fn summary(&self) -> String {
231 let temp_range = if self.temperature_stats.min_temp.is_nan()
232 || self.temperature_stats.max_temp.is_nan()
233 {
234 "No valid measurements".to_string()
235 } else {
236 format!(
237 "{:.1}°C to {:.1}°C",
238 self.temperature_stats.min_temp, self.temperature_stats.max_temp
239 )
240 };
241
242 format!(
243 "Weather Parameters: Temperature (min/max/avg)\n\
244 Stations: {} stations\n\
245 Date Range: {} to {} ({} years)\n\
246 Records: {} total\n\
247 Data Quality: {:.1}% valid, {:.1}% suspect, {:.1}% missing\n\
248 Temperature Range: {}\n\
249 Coverage: {:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E",
250 self.unique_stations,
251 self.date_range.0,
252 self.date_range.1,
253 (self
254 .date_range
255 .1
256 .signed_duration_since(self.date_range.0)
257 .num_days()
258 / 365),
259 self.total_records,
260 self.data_quality.valid_percentage(),
261 self.data_quality.suspect_percentage(),
262 self.data_quality.missing_percentage(),
263 temp_range,
264 self.geographic_bounds.min_lat,
265 self.geographic_bounds.max_lat,
266 self.geographic_bounds.min_lon.abs(),
267 self.geographic_bounds.max_lon
268 )
269 }
270
271 pub fn detailed_summary(&self) -> String {
272 let coldest = if self.temperature_stats.min_temp.is_nan() {
273 "No valid measurements".to_string()
274 } else {
275 format!(
276 "{:.1}°C at {}",
277 self.temperature_stats.min_temp, self.temperature_stats.min_temp_location
278 )
279 };
280
281 let hottest = if self.temperature_stats.max_temp.is_nan() {
282 "No valid measurements".to_string()
283 } else {
284 format!(
285 "{:.1}°C at {}",
286 self.temperature_stats.max_temp, self.temperature_stats.max_temp_location
287 )
288 };
289
290 let average = if self.temperature_stats.avg_temp.is_nan() {
291 "No valid measurements".to_string()
292 } else {
293 format!("{:.1}°C", self.temperature_stats.avg_temp)
294 };
295
296 format!(
297 "{}\n\n\
298 Extreme Temperatures (valid range only):\n\
299 - Coldest: {}\n\
300 - Hottest: {}\n\
301 - Average: {}\n\n\
302 Data Completeness:\n\
303 - Complete records: {}/{} ({:.1}%)",
304 self.summary(),
305 coldest,
306 hottest,
307 average,
308 self.data_quality.complete_records,
309 self.data_quality.total_records,
310 (self.data_quality.complete_records as f64 / self.data_quality.total_records as f64)
311 * 100.0
312 )
313 }
314}
315
316impl Default for WeatherAnalyzer {
317 fn default() -> Self {
318 Self::new()
319 }
320}