1use crate::archive::{
2 ArchiveInspector, ArchiveMetadata, TempFileManager, TemperatureType, WeatherMetric,
3};
4use crate::error::{ProcessingError, Result};
5use crate::models::{StationMetadata, WeatherRecord};
6use crate::processors::{IntegrityReport, StationStatistics, TemperatureViolation, ViolationType};
7use crate::readers::{StationReader, TemperatureReader};
8use chrono::NaiveDate;
9use std::collections::HashMap;
10use std::fs::File;
11use std::io::{BufRead, BufReader};
12use std::path::Path;
13
14pub struct ArchiveProcessor {
15 temp_manager: TempFileManager,
16 archive_metadata: ArchiveMetadata,
17}
18
19impl ArchiveProcessor {
20 pub async fn from_zip(zip_path: &Path) -> Result<Self> {
21 let archive_metadata = ArchiveInspector::inspect_zip(zip_path)?;
23
24 let temp_manager = TempFileManager::new()?;
26
27 Ok(Self {
28 temp_manager,
29 archive_metadata,
30 })
31 }
32
33 pub fn metadata(&self) -> &ArchiveMetadata {
34 &self.archive_metadata
35 }
36
37 pub async fn process_data(
38 mut self,
39 zip_path: &Path,
40 ) -> Result<(Vec<WeatherRecord>, IntegrityReport)> {
41 let metadata_files = self.temp_manager.extract_metadata_files(zip_path)?;
43
44 let station_map = if let Some(stations_path) = metadata_files.get("stations.txt") {
46 let reader = StationReader::new();
47 reader.read_stations_map(stations_path)?
48 } else {
49 return Err(ProcessingError::InvalidFormat(
50 "stations.txt not found in archive".to_string(),
51 ));
52 };
53
54 println!("Loaded {} stations from metadata", station_map.len());
55
56 let mut weather_data: HashMap<(u32, NaiveDate), WeatherRecord> = HashMap::new();
58
59 for metric in &self.archive_metadata.metrics {
61 let pattern = format!("{}_STAID", metric.to_file_prefix());
62 let data_files = self
63 .temp_manager
64 .extract_files_matching_pattern(zip_path, &pattern)?;
65
66 println!(
67 "Processing {} files for metric: {}",
68 data_files.len(),
69 metric
70 );
71
72 self.process_metric_files(&data_files, metric, &station_map, &mut weather_data)
74 .await?;
75 }
76
77 let mut all_records: Vec<WeatherRecord> = weather_data.into_values().collect();
79
80 for record in &mut all_records {
82 record.perform_physical_validation();
83 }
84
85 let integrity_report = self.calculate_integrity_report(&all_records);
86
87 self.temp_manager.cleanup()?;
89
90 Ok((all_records, integrity_report))
91 }
92
93 async fn process_metric_files(
94 &self,
95 file_paths: &[std::path::PathBuf],
96 metric: &WeatherMetric,
97 station_map: &HashMap<u32, StationMetadata>,
98 weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
99 ) -> Result<()> {
100 for file_path in file_paths {
101 if let Some(file_name) = file_path.file_name().and_then(|n| n.to_str()) {
102 if let Some(station_id) = extract_station_id_from_filename(file_name) {
103 let station_metadata = station_map.get(&station_id);
105 if station_metadata.is_none() {
106 println!("Warning: Station {} not found in metadata", station_id);
107 continue;
108 }
109 let station = station_metadata.unwrap();
110
111 match metric {
113 WeatherMetric::Temperature(temp_type) => {
114 self.process_temperature_file(
115 file_path,
116 station,
117 temp_type,
118 weather_data,
119 )?;
120 }
121 WeatherMetric::Precipitation => {
122 self.process_precipitation_file(file_path, station, weather_data)?;
123 }
124 WeatherMetric::WindSpeed => {
125 self.process_wind_speed_file(file_path, station, weather_data)?;
126 }
127 }
128 }
129 }
130 }
131
132 Ok(())
133 }
134
135 pub fn temp_dir_path(&self) -> &Path {
136 self.temp_manager.temp_dir_path()
137 }
138
139 pub fn cleanup(mut self) -> Result<()> {
140 self.temp_manager.cleanup()
141 }
142
143 fn process_temperature_file(
144 &self,
145 file_path: &Path,
146 station: &StationMetadata,
147 temp_type: &TemperatureType,
148 weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
149 ) -> Result<()> {
150 let reader = TemperatureReader::new();
151 let temp_records = reader.read_temperatures_with_station_id(file_path, station.staid)?;
152
153 for temp_record in temp_records {
154 let key = (temp_record.staid, temp_record.date);
155
156 let weather_record = weather_data.entry(key).or_insert_with(|| {
158 WeatherRecord::builder()
159 .station_id(station.staid)
160 .station_name(station.name.clone())
161 .date(temp_record.date)
162 .coordinates(station.latitude, station.longitude)
163 .build()
164 .unwrap_or_else(|_| {
165 WeatherRecord::new(
167 station.staid,
168 station.name.clone(),
169 temp_record.date,
170 station.latitude,
171 station.longitude,
172 None,
173 None,
174 None,
175 None,
176 None,
177 None,
178 None,
179 None,
180 )
181 })
182 });
183
184 match temp_type {
186 TemperatureType::Minimum => {
187 weather_record.temp_min = Some(temp_record.temperature);
188 }
189 TemperatureType::Maximum => {
190 weather_record.temp_max = Some(temp_record.temperature);
191 }
192 TemperatureType::Average => {
193 weather_record.temp_avg = Some(temp_record.temperature);
194 }
195 }
196
197 let quality_str = temp_record.quality_flag.to_string();
199 if let Some(ref existing) = weather_record.temp_quality {
200 if !existing.contains(&quality_str) {
201 weather_record.temp_quality = Some(format!("{}{}", existing, quality_str));
202 }
203 } else {
204 weather_record.temp_quality = Some(quality_str);
205 }
206 }
207
208 Ok(())
209 }
210
211 fn process_precipitation_file(
212 &self,
213 file_path: &Path,
214 station: &StationMetadata,
215 weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
216 ) -> Result<()> {
217 let precip_records = self.parse_weather_file(file_path, station.staid)?;
218
219 for (date, value, quality) in precip_records {
220 let key = (station.staid, date);
221
222 let weather_record = weather_data.entry(key).or_insert_with(|| {
223 WeatherRecord::new(
224 station.staid,
225 station.name.clone(),
226 date,
227 station.latitude,
228 station.longitude,
229 None,
230 None,
231 None,
232 None,
233 None,
234 None,
235 None,
236 None,
237 )
238 });
239
240 weather_record.precipitation = Some(value / 10.0); weather_record.precip_quality = Some(quality.to_string());
242 }
243
244 Ok(())
245 }
246
247 fn process_wind_speed_file(
248 &self,
249 file_path: &Path,
250 station: &StationMetadata,
251 weather_data: &mut HashMap<(u32, NaiveDate), WeatherRecord>,
252 ) -> Result<()> {
253 let wind_records = self.parse_weather_file(file_path, station.staid)?;
254
255 for (date, value, quality) in wind_records {
256 let key = (station.staid, date);
257
258 let weather_record = weather_data.entry(key).or_insert_with(|| {
259 WeatherRecord::new(
260 station.staid,
261 station.name.clone(),
262 date,
263 station.latitude,
264 station.longitude,
265 None,
266 None,
267 None,
268 None,
269 None,
270 None,
271 None,
272 None,
273 )
274 });
275
276 weather_record.wind_speed = Some(value / 10.0); weather_record.wind_quality = Some(quality.to_string());
278 }
279
280 Ok(())
281 }
282
283 fn parse_weather_file(
284 &self,
285 file_path: &Path,
286 _station_id: u32,
287 ) -> Result<Vec<(NaiveDate, f32, u8)>> {
288 let file = File::open(file_path)?;
289 let reader = BufReader::new(file);
290 let mut records = Vec::new();
291 let mut line_count = 0;
292
293 for line_result in reader.lines() {
294 let line = line_result?;
295 line_count += 1;
296
297 if line.trim().is_empty() {
299 continue;
300 }
301
302 if line_count <= 20 {
304 continue;
305 }
306
307 let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
309 if parts.len() < 4 {
310 continue; }
312
313 if let Ok(date) = NaiveDate::parse_from_str(parts[1], "%Y%m%d") {
315 if parts[2] != "-9999" {
317 if let (Ok(value), Ok(quality)) =
318 (parts[2].parse::<f32>(), parts[3].parse::<u8>())
319 {
320 records.push((date, value, quality));
321 }
322 }
323 }
324 }
325
326 Ok(records)
327 }
328
329 fn calculate_integrity_report(&self, records: &[WeatherRecord]) -> IntegrityReport {
330 let mut valid_records = 0;
331 let mut suspect_records = 0;
332 let mut invalid_records = 0;
333 let mut missing_data_records = 0;
334 let mut temperature_violations = Vec::new();
335 let mut station_statistics: HashMap<u32, StationStatistics> = HashMap::new();
336
337 for record in records {
338 if record.has_valid_temperature_data()
340 && record.has_valid_precipitation_data()
341 && record.has_valid_wind_data()
342 {
343 valid_records += 1;
344 } else if record.has_suspect_data() {
345 suspect_records += 1;
346 } else if record.has_missing_data() {
347 missing_data_records += 1;
348 }
349
350 if let Err(e) = record.validate_relationships() {
352 let violation_type = if e.to_string().contains("Min temperature") {
353 ViolationType::MinGreaterThanAvg
354 } else if e.to_string().contains("Avg temperature") {
355 ViolationType::AvgGreaterThanMax
356 } else {
357 ViolationType::OutOfRange
358 };
359
360 temperature_violations.push(TemperatureViolation {
361 station_id: record.station_id,
362 date: record.date,
363 violation_type,
364 details: e.to_string(),
365 });
366 invalid_records += 1;
367 }
368
369 let station_stats = station_statistics
371 .entry(record.station_id)
372 .or_insert_with(|| StationStatistics::default());
373
374 station_stats.total_records += 1;
375
376 if record.has_valid_temperature_data() {
377 station_stats.valid_records += 1;
378 } else if record.has_suspect_data() {
379 station_stats.suspect_records += 1;
380 } else if record.has_missing_data() {
381 station_stats.missing_data_records += 1;
382 }
383
384 if let Some(min_temp) = record.temp_min {
386 station_stats.min_temp = Some(
387 station_stats
388 .min_temp
389 .map_or(min_temp, |curr| curr.min(min_temp)),
390 );
391 }
392 if let Some(max_temp) = record.temp_max {
393 station_stats.max_temp = Some(
394 station_stats
395 .max_temp
396 .map_or(max_temp, |curr| curr.max(max_temp)),
397 );
398 }
399 if let Some(avg_temp) = record.temp_avg {
400 station_stats.avg_temp = Some(
401 station_stats
402 .avg_temp
403 .map_or(avg_temp, |curr| (curr + avg_temp) / 2.0),
404 );
405 }
406 }
407
408 IntegrityReport {
409 total_records: records.len(),
410 valid_records,
411 suspect_records,
412 invalid_records,
413 missing_data_records,
414 temperature_violations,
415 station_statistics,
416 }
417 }
418}
419
420impl Drop for ArchiveProcessor {
421 fn drop(&mut self) {
422 if let Err(e) = self.temp_manager.cleanup() {
423 eprintln!("Warning: Failed to cleanup archive processor: {}", e);
424 }
425 }
426}
427
428fn extract_station_id_from_filename(file_name: &str) -> Option<u32> {
429 if let Some(start) = file_name.find("STAID") {
431 let after_staid = &file_name[start + 5..];
432 if let Some(end) = after_staid.find('.') {
433 let id_str = &after_staid[..end];
434 id_str.trim_start_matches('0').parse().ok()
436 } else {
437 None
438 }
439 } else {
440 None
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_extract_station_id_from_filename() {
450 assert_eq!(
451 extract_station_id_from_filename("TX_STAID000257.txt"),
452 Some(257)
453 );
454 assert_eq!(
455 extract_station_id_from_filename("RR_STAID001234.txt"),
456 Some(1234)
457 );
458 assert_eq!(extract_station_id_from_filename("invalid_file.txt"), None);
459 }
460}