ecad_processor/archive/
processor.rs

1use crate::archive::{
2    ArchiveInspector, ArchiveMetadata, TempFileManager, TemperatureType, WeatherMetric,
3};
4use crate::error::{ProcessingError, Result};
5use crate::models::{StationMetadata, WeatherRecord};
6use crate::processors::{IntegrityReport, StationStatistics, TemperatureViolation, ViolationType};
7use crate::readers::{StationReader, TemperatureReader};
8use chrono::NaiveDate;
9use std::collections::HashMap;
10use std::fs::File;
11use std::io::{BufRead, BufReader};
12use std::path::Path;
13
14pub struct ArchiveProcessor {
15    temp_manager: TempFileManager,
16    archive_metadata: ArchiveMetadata,
17}
18
19impl ArchiveProcessor {
20    pub async fn from_zip(zip_path: &Path) -> Result<Self> {
21        // Inspect the archive to get metadata
22        let archive_metadata = ArchiveInspector::inspect_zip(zip_path)?;
23
24        // Create temporary file manager
25        let temp_manager = TempFileManager::new()?;
26
27        Ok(Self {
28            temp_manager,
29            archive_metadata,
30        })
31    }
32
33    pub fn metadata(&self) -> &ArchiveMetadata {
34        &self.archive_metadata
35    }
36
37    pub async fn process_data(
38        mut self,
39        zip_path: &Path,
40    ) -> Result<(Vec<WeatherRecord>, IntegrityReport)> {
41        // Extract metadata files
42        let metadata_files = self.temp_manager.extract_metadata_files(zip_path)?;
43
44        // Read station metadata
45        let station_map = if let Some(stations_path) = metadata_files.get("stations.txt") {
46            let reader = StationReader::new();
47            reader.read_stations_map(stations_path)?
48        } else {
49            return Err(ProcessingError::InvalidFormat(
50                "stations.txt not found in archive".to_string(),
51            ));
52        };
53
54        println!("Loaded {} stations from metadata", station_map.len());
55
56        // Group weather data by station and date
57        let mut weather_data: HashMap<(u32, NaiveDate), WeatherRecord> = HashMap::new();
58
59        // Process each metric type
60        for metric in &self.archive_metadata.metrics {
61            let pattern = format!("{}_STAID", metric.to_file_prefix());
62            let data_files = self
63                .temp_manager
64                .extract_files_matching_pattern(zip_path, &pattern)?;
65
66            println!(
67                "Processing {} files for metric: {}",
68                data_files.len(),
69                metric
70            );
71
72            // Process files for this metric
73            self.process_metric_files(&data_files, metric, &station_map, &mut weather_data)
74                .await?;
75        }
76
77        // Convert to vector and ensure all records have physical validation
78        let mut all_records: Vec<WeatherRecord> = weather_data.into_values().collect();
79
80        // Ensure all records have physical validation performed after data population
81        for record in &mut all_records {
82            record.perform_physical_validation();
83        }
84
85        let integrity_report = self.calculate_integrity_report(&all_records);
86
87        // Cleanup temporary files
88        self.temp_manager.cleanup()?;
89
90        Ok((all_records, integrity_report))
91    }
92
93    async fn process_metric_files(
94        &self,
95        file_paths: &[std::path::PathBuf],
96        metric: &WeatherMetric,
97        station_map: &HashMap<u32, StationMetadata>,
98        weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
99    ) -> Result<()> {
100        for file_path in file_paths {
101            if let Some(file_name) = file_path.file_name().and_then(|n| n.to_str()) {
102                if let Some(station_id) = extract_station_id_from_filename(file_name) {
103                    // Get station metadata
104                    let station_metadata = station_map.get(&station_id);
105                    if station_metadata.is_none() {
106                        println!("Warning: Station {} not found in metadata", station_id);
107                        continue;
108                    }
109                    let station = station_metadata.unwrap();
110
111                    // Parse weather data based on metric type
112                    match metric {
113                        WeatherMetric::Temperature(temp_type) => {
114                            self.process_temperature_file(
115                                file_path,
116                                station,
117                                temp_type,
118                                weather_data,
119                            )?;
120                        }
121                        WeatherMetric::Precipitation => {
122                            self.process_precipitation_file(file_path, station, weather_data)?;
123                        }
124                        WeatherMetric::WindSpeed => {
125                            self.process_wind_speed_file(file_path, station, weather_data)?;
126                        }
127                    }
128                }
129            }
130        }
131
132        Ok(())
133    }
134
135    pub fn temp_dir_path(&self) -> &Path {
136        self.temp_manager.temp_dir_path()
137    }
138
139    pub fn cleanup(mut self) -> Result<()> {
140        self.temp_manager.cleanup()
141    }
142
143    fn process_temperature_file(
144        &self,
145        file_path: &Path,
146        station: &StationMetadata,
147        temp_type: &TemperatureType,
148        weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
149    ) -> Result<()> {
150        let reader = TemperatureReader::new();
151        let temp_records = reader.read_temperatures_with_station_id(file_path, station.staid)?;
152
153        for temp_record in temp_records {
154            let key = (temp_record.staid, temp_record.date);
155
156            // Get or create weather record for this station/date
157            let weather_record = weather_data.entry(key).or_insert_with(|| {
158                WeatherRecord::builder()
159                    .station_id(station.staid)
160                    .station_name(station.name.clone())
161                    .date(temp_record.date)
162                    .coordinates(station.latitude, station.longitude)
163                    .build()
164                    .unwrap_or_else(|_| {
165                        // Fallback record if builder fails
166                        WeatherRecord::new(
167                            station.staid,
168                            station.name.clone(),
169                            temp_record.date,
170                            station.latitude,
171                            station.longitude,
172                            None,
173                            None,
174                            None,
175                            None,
176                            None,
177                            None,
178                            None,
179                            None,
180                        )
181                    })
182            });
183
184            // Update temperature data based on type
185            match temp_type {
186                TemperatureType::Minimum => {
187                    weather_record.temp_min = Some(temp_record.temperature);
188                }
189                TemperatureType::Maximum => {
190                    weather_record.temp_max = Some(temp_record.temperature);
191                }
192                TemperatureType::Average => {
193                    weather_record.temp_avg = Some(temp_record.temperature);
194                }
195            }
196
197            // Update quality flag (combine multiple flags)
198            let quality_str = temp_record.quality_flag.to_string();
199            if let Some(ref existing) = weather_record.temp_quality {
200                if !existing.contains(&quality_str) {
201                    weather_record.temp_quality = Some(format!("{}{}", existing, quality_str));
202                }
203            } else {
204                weather_record.temp_quality = Some(quality_str);
205            }
206        }
207
208        Ok(())
209    }
210
211    fn process_precipitation_file(
212        &self,
213        file_path: &Path,
214        station: &StationMetadata,
215        weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
216    ) -> Result<()> {
217        let precip_records = self.parse_weather_file(file_path, station.staid)?;
218
219        for (date, value, quality) in precip_records {
220            let key = (station.staid, date);
221
222            let weather_record = weather_data.entry(key).or_insert_with(|| {
223                WeatherRecord::new(
224                    station.staid,
225                    station.name.clone(),
226                    date,
227                    station.latitude,
228                    station.longitude,
229                    None,
230                    None,
231                    None,
232                    None,
233                    None,
234                    None,
235                    None,
236                    None,
237                )
238            });
239
240            weather_record.precipitation = Some(value / 10.0); // Convert from 0.1mm to mm
241            weather_record.precip_quality = Some(quality.to_string());
242        }
243
244        Ok(())
245    }
246
247    fn process_wind_speed_file(
248        &self,
249        file_path: &Path,
250        station: &StationMetadata,
251        weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
252    ) -> Result<()> {
253        let wind_records = self.parse_weather_file(file_path, station.staid)?;
254
255        for (date, value, quality) in wind_records {
256            let key = (station.staid, date);
257
258            let weather_record = weather_data.entry(key).or_insert_with(|| {
259                WeatherRecord::new(
260                    station.staid,
261                    station.name.clone(),
262                    date,
263                    station.latitude,
264                    station.longitude,
265                    None,
266                    None,
267                    None,
268                    None,
269                    None,
270                    None,
271                    None,
272                    None,
273                )
274            });
275
276            weather_record.wind_speed = Some(value / 10.0); // Convert from 0.1 m/s to m/s
277            weather_record.wind_quality = Some(quality.to_string());
278        }
279
280        Ok(())
281    }
282
283    fn parse_weather_file(
284        &self,
285        file_path: &Path,
286        _station_id: u32,
287    ) -> Result<Vec<(NaiveDate, f32, u8)>> {
288        let file = File::open(file_path)?;
289        let reader = BufReader::new(file);
290        let mut records = Vec::new();
291        let mut line_count = 0;
292
293        for line_result in reader.lines() {
294            let line = line_result?;
295            line_count += 1;
296
297            // Skip empty lines
298            if line.trim().is_empty() {
299                continue;
300            }
301
302            // Skip header lines (first 20 lines typically contain headers)
303            if line_count <= 20 {
304                continue;
305            }
306
307            // Parse data line: SOUID, DATE, VALUE, Q_FLAG
308            let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
309            if parts.len() < 4 {
310                continue; // Skip malformed lines
311            }
312
313            // Parse date (YYYYMMDD format)
314            if let Ok(date) = NaiveDate::parse_from_str(parts[1], "%Y%m%d") {
315                // Parse value (skip missing values)
316                if parts[2] != "-9999" {
317                    if let (Ok(value), Ok(quality)) =
318                        (parts[2].parse::<f32>(), parts[3].parse::<u8>())
319                    {
320                        records.push((date, value, quality));
321                    }
322                }
323            }
324        }
325
326        Ok(records)
327    }
328
329    fn calculate_integrity_report(&self, records: &[WeatherRecord]) -> IntegrityReport {
330        let mut valid_records = 0;
331        let mut suspect_records = 0;
332        let mut invalid_records = 0;
333        let mut missing_data_records = 0;
334        let mut temperature_violations = Vec::new();
335        let mut station_statistics: HashMap<u32, StationStatistics> = HashMap::new();
336
337        for record in records {
338            // Check data quality
339            if record.has_valid_temperature_data()
340                && record.has_valid_precipitation_data()
341                && record.has_valid_wind_data()
342            {
343                valid_records += 1;
344            } else if record.has_suspect_data() {
345                suspect_records += 1;
346            } else if record.has_missing_data() {
347                missing_data_records += 1;
348            }
349
350            // Check temperature relationships
351            if let Err(e) = record.validate_relationships() {
352                let violation_type = if e.to_string().contains("Min temperature") {
353                    ViolationType::MinGreaterThanAvg
354                } else if e.to_string().contains("Avg temperature") {
355                    ViolationType::AvgGreaterThanMax
356                } else {
357                    ViolationType::OutOfRange
358                };
359
360                temperature_violations.push(TemperatureViolation {
361                    station_id: record.station_id,
362                    date: record.date,
363                    violation_type,
364                    details: e.to_string(),
365                });
366                invalid_records += 1;
367            }
368
369            // Update station statistics
370            let station_stats = station_statistics.entry(record.station_id).or_default();
371
372            station_stats.total_records += 1;
373
374            if record.has_valid_temperature_data() {
375                station_stats.valid_records += 1;
376            } else if record.has_suspect_data() {
377                station_stats.suspect_records += 1;
378            } else if record.has_missing_data() {
379                station_stats.missing_data_records += 1;
380            }
381
382            // Update temperature statistics
383            if let Some(min_temp) = record.temp_min {
384                station_stats.min_temp = Some(
385                    station_stats
386                        .min_temp
387                        .map_or(min_temp, |curr| curr.min(min_temp)),
388                );
389            }
390            if let Some(max_temp) = record.temp_max {
391                station_stats.max_temp = Some(
392                    station_stats
393                        .max_temp
394                        .map_or(max_temp, |curr| curr.max(max_temp)),
395                );
396            }
397            if let Some(avg_temp) = record.temp_avg {
398                station_stats.avg_temp = Some(
399                    station_stats
400                        .avg_temp
401                        .map_or(avg_temp, |curr| (curr + avg_temp) / 2.0),
402                );
403            }
404        }
405
406        IntegrityReport {
407            total_records: records.len(),
408            valid_records,
409            suspect_records,
410            invalid_records,
411            missing_data_records,
412            temperature_violations,
413            station_statistics,
414        }
415    }
416}
417
418impl Drop for ArchiveProcessor {
419    fn drop(&mut self) {
420        if let Err(e) = self.temp_manager.cleanup() {
421            eprintln!("Warning: Failed to cleanup archive processor: {}", e);
422        }
423    }
424}
425
426fn extract_station_id_from_filename(file_name: &str) -> Option<u32> {
427    // Extract station ID from patterns like TX_STAID000257.txt
428    if let Some(start) = file_name.find("STAID") {
429        let after_staid = &file_name[start + 5..];
430        if let Some(end) = after_staid.find('.') {
431            let id_str = &after_staid[..end];
432            // Remove leading zeros and parse
433            id_str.trim_start_matches('0').parse().ok()
434        } else {
435            None
436        }
437    } else {
438        None
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_extract_station_id_from_filename() {
448        assert_eq!(
449            extract_station_id_from_filename("TX_STAID000257.txt"),
450            Some(257)
451        );
452        assert_eq!(
453            extract_station_id_from_filename("RR_STAID001234.txt"),
454            Some(1234)
455        );
456        assert_eq!(extract_station_id_from_filename("invalid_file.txt"), None);
457    }
458}