ecad_processor/analyzers/
weather_analyzer.rs

1use crate::error::Result;
2use crate::models::ConsolidatedRecord;
3use crate::utils::constants::{MAX_VALID_TEMP, MIN_VALID_TEMP};
4use chrono::NaiveDate;
5use std::collections::HashSet;
6use std::path::Path;
7
8/// Check if a temperature value is within the valid range and not a missing value
9fn is_valid_temperature(temp: f32) -> bool {
10    temp != -9999.0 && (MIN_VALID_TEMP..=MAX_VALID_TEMP).contains(&temp)
11}
12
13#[derive(Debug)]
14pub struct WeatherStatistics {
15    pub total_records: usize,
16    pub unique_stations: usize,
17    pub date_range: (NaiveDate, NaiveDate),
18    pub temperature_stats: TemperatureStats,
19    pub data_quality: DataQuality,
20    pub geographic_bounds: GeographicBounds,
21}
22
23#[derive(Debug)]
24pub struct TemperatureStats {
25    pub min_temp: f32,
26    pub max_temp: f32,
27    pub avg_temp: f32,
28    pub min_temp_location: String,
29    pub max_temp_location: String,
30}
31
32#[derive(Debug)]
33pub struct DataQuality {
34    pub total_records: usize,
35    pub valid_records: usize,
36    pub suspect_records: usize,
37    pub missing_records: usize,
38    pub complete_records: usize,
39}
40
41impl DataQuality {
42    pub fn valid_percentage(&self) -> f64 {
43        (self.valid_records as f64 / self.total_records as f64) * 100.0
44    }
45
46    pub fn suspect_percentage(&self) -> f64 {
47        (self.suspect_records as f64 / self.total_records as f64) * 100.0
48    }
49
50    pub fn missing_percentage(&self) -> f64 {
51        (self.missing_records as f64 / self.total_records as f64) * 100.0
52    }
53}
54
55#[derive(Debug)]
56pub struct GeographicBounds {
57    pub min_lat: f64,
58    pub max_lat: f64,
59    pub min_lon: f64,
60    pub max_lon: f64,
61}
62
63pub struct WeatherAnalyzer;
64
65impl WeatherAnalyzer {
66    pub fn new() -> Self {
67        Self
68    }
69
70    pub fn analyze_parquet(&self, path: &Path) -> Result<WeatherStatistics> {
71        self.analyze_parquet_with_limit(path, 0) // Default to all records
72    }
73
74    pub fn analyze_parquet_with_limit(
75        &self,
76        path: &Path,
77        limit: usize,
78    ) -> Result<WeatherStatistics> {
79        let writer = crate::writers::ParquetWriter::new();
80
81        // Get file info to determine how many records to read
82        let file_info = writer.get_file_info(path)?;
83        let total_rows = file_info.total_rows as usize;
84
85        // Determine how many records to read
86        let records_to_read = if limit == 0 {
87            total_rows // 0 means read all records
88        } else {
89            limit.min(total_rows) // Read up to limit, but not more than available
90        };
91
92        let records = writer.read_sample_records(path, records_to_read)?;
93
94        if records.is_empty() {
95            return Err(crate::error::ProcessingError::Config(
96                "No records found in Parquet file".to_string(),
97            ));
98        }
99
100        self.calculate_statistics(&records)
101    }
102
103    fn calculate_statistics(&self, records: &[ConsolidatedRecord]) -> Result<WeatherStatistics> {
104        if records.is_empty() {
105            return Err(crate::error::ProcessingError::Config(
106                "No records to analyze".to_string(),
107            ));
108        }
109
110        let mut unique_stations = HashSet::new();
111        let mut min_date = records[0].date;
112        let mut max_date = records[0].date;
113        let mut min_temp = f32::INFINITY;
114        let mut max_temp = f32::NEG_INFINITY;
115        let mut temp_sum = 0.0f64;
116        let mut temp_count = 0;
117        let mut min_temp_location = String::new();
118        let mut max_temp_location = String::new();
119
120        let mut valid_count = 0;
121        let mut suspect_count = 0;
122        let mut missing_count = 0;
123        let mut complete_count = 0;
124
125        let mut min_lat = records[0].latitude;
126        let mut max_lat = records[0].latitude;
127        let mut min_lon = records[0].longitude;
128        let mut max_lon = records[0].longitude;
129
130        for record in records {
131            unique_stations.insert(record.station_id);
132
133            if record.date < min_date {
134                min_date = record.date;
135            }
136            if record.date > max_date {
137                max_date = record.date;
138            }
139
140            // Only include temperatures within valid range for statistics
141            if is_valid_temperature(record.min_temp) && record.min_temp < min_temp {
142                min_temp = record.min_temp;
143                min_temp_location = format!("{} ({})", record.station_name, record.date);
144            }
145
146            if is_valid_temperature(record.max_temp) && record.max_temp > max_temp {
147                max_temp = record.max_temp;
148                max_temp_location = format!("{} ({})", record.station_name, record.date);
149            }
150
151            // Only include valid average temperatures in the overall average
152            if is_valid_temperature(record.avg_temp) {
153                temp_sum += record.avg_temp as f64;
154                temp_count += 1;
155            }
156
157            if record.has_valid_data() {
158                valid_count += 1;
159            }
160            if record.has_suspect_data() {
161                suspect_count += 1;
162            }
163            if record.has_missing_data() {
164                missing_count += 1;
165            }
166            if record.is_complete() {
167                complete_count += 1;
168            }
169
170            if record.latitude < min_lat {
171                min_lat = record.latitude;
172            }
173            if record.latitude > max_lat {
174                max_lat = record.latitude;
175            }
176            if record.longitude < min_lon {
177                min_lon = record.longitude;
178            }
179            if record.longitude > max_lon {
180                max_lon = record.longitude;
181            }
182        }
183
184        // Handle case where no valid temperatures were found
185        let avg_temp = if temp_count > 0 {
186            (temp_sum / temp_count as f64) as f32
187        } else {
188            f32::NAN
189        };
190
191        // Handle case where min/max are still infinity (no valid temperatures)
192        if min_temp == f32::INFINITY {
193            min_temp = f32::NAN;
194            min_temp_location = "No valid measurements".to_string();
195        }
196        if max_temp == f32::NEG_INFINITY {
197            max_temp = f32::NAN;
198            max_temp_location = "No valid measurements".to_string();
199        }
200
201        Ok(WeatherStatistics {
202            total_records: records.len(),
203            unique_stations: unique_stations.len(),
204            date_range: (min_date, max_date),
205            temperature_stats: TemperatureStats {
206                min_temp,
207                max_temp,
208                avg_temp,
209                min_temp_location,
210                max_temp_location,
211            },
212            data_quality: DataQuality {
213                total_records: records.len(),
214                valid_records: valid_count,
215                suspect_records: suspect_count,
216                missing_records: missing_count,
217                complete_records: complete_count,
218            },
219            geographic_bounds: GeographicBounds {
220                min_lat,
221                max_lat,
222                min_lon,
223                max_lon,
224            },
225        })
226    }
227}
228
229impl WeatherStatistics {
230    pub fn summary(&self) -> String {
231        let temp_range = if self.temperature_stats.min_temp.is_nan()
232            || self.temperature_stats.max_temp.is_nan()
233        {
234            "No valid measurements".to_string()
235        } else {
236            format!(
237                "{:.1}°C to {:.1}°C",
238                self.temperature_stats.min_temp, self.temperature_stats.max_temp
239            )
240        };
241
242        format!(
243            "Weather Parameters: Temperature (min/max/avg)\n\
244            Stations: {} stations\n\
245            Date Range: {} to {} ({} years)\n\
246            Records: {} total\n\
247            Data Quality: {:.1}% valid, {:.1}% suspect, {:.1}% missing\n\
248            Temperature Range: {}\n\
249            Coverage: {:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E",
250            self.unique_stations,
251            self.date_range.0,
252            self.date_range.1,
253            (self
254                .date_range
255                .1
256                .signed_duration_since(self.date_range.0)
257                .num_days()
258                / 365),
259            self.total_records,
260            self.data_quality.valid_percentage(),
261            self.data_quality.suspect_percentage(),
262            self.data_quality.missing_percentage(),
263            temp_range,
264            self.geographic_bounds.min_lat,
265            self.geographic_bounds.max_lat,
266            self.geographic_bounds.min_lon.abs(),
267            self.geographic_bounds.max_lon
268        )
269    }
270
271    pub fn detailed_summary(&self) -> String {
272        let coldest = if self.temperature_stats.min_temp.is_nan() {
273            "No valid measurements".to_string()
274        } else {
275            format!(
276                "{:.1}°C at {}",
277                self.temperature_stats.min_temp, self.temperature_stats.min_temp_location
278            )
279        };
280
281        let hottest = if self.temperature_stats.max_temp.is_nan() {
282            "No valid measurements".to_string()
283        } else {
284            format!(
285                "{:.1}°C at {}",
286                self.temperature_stats.max_temp, self.temperature_stats.max_temp_location
287            )
288        };
289
290        let average = if self.temperature_stats.avg_temp.is_nan() {
291            "No valid measurements".to_string()
292        } else {
293            format!("{:.1}°C", self.temperature_stats.avg_temp)
294        };
295
296        format!(
297            "{}\n\n\
298            Extreme Temperatures (valid range only):\n\
299            - Coldest: {}\n\
300            - Hottest: {}\n\
301            - Average: {}\n\n\
302            Data Completeness:\n\
303            - Complete records: {}/{} ({:.1}%)",
304            self.summary(),
305            coldest,
306            hottest,
307            average,
308            self.data_quality.complete_records,
309            self.data_quality.total_records,
310            (self.data_quality.complete_records as f64 / self.data_quality.total_records as f64)
311                * 100.0
312        )
313    }
314}
315
316impl Default for WeatherAnalyzer {
317    fn default() -> Self {
318        Self::new()
319    }
320}