1use crate::archive::{ArchiveInspector, ArchiveProcessor, WeatherMetric};
2use crate::error::{ProcessingError, Result};
3use crate::models::WeatherRecord;
4use crate::processors::IntegrityReport;
5use chrono::NaiveDate;
6use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use tokio::task::JoinSet;
10
11#[derive(Debug, Clone)]
12pub struct ArchiveInfo {
13 pub path: PathBuf,
14 pub metrics: Vec<WeatherMetric>,
15 pub station_count: usize,
16 pub file_count: usize,
17}
18
19#[derive(Debug, Clone)]
20pub struct DatasetComposition {
21 pub total_records: usize,
22 pub records_with_temperature: usize,
23 pub records_with_precipitation: usize,
24 pub records_with_wind_speed: usize,
25 pub available_metrics: Vec<String>,
26}
27
28pub struct MultiArchiveProcessor {
29 archives: Vec<ArchiveInfo>,
30 max_workers: usize,
31}
32
33impl MultiArchiveProcessor {
34 pub async fn from_directory(
36 dir_path: &Path,
37 file_pattern: Option<&str>,
38 max_workers: usize,
39 ) -> Result<Self> {
40 if !dir_path.is_dir() {
41 return Err(ProcessingError::InvalidFormat(format!(
42 "Path is not a directory: {}",
43 dir_path.display()
44 )));
45 }
46
47 let mut archives = Vec::new();
48
49 let entries = fs::read_dir(dir_path)?;
51
52 for entry in entries {
53 let entry = entry?;
54 let path = entry.path();
55
56 if !path.is_file() || path.extension().is_none_or(|ext| ext != "zip") {
58 continue;
59 }
60
61 if let Some(pattern) = file_pattern {
63 if !pattern.is_empty() {
64 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
65 if !filename.contains(pattern) {
66 continue;
67 }
68 } else {
69 continue;
70 }
71 }
72 }
73
74 println!("Inspecting archive: {}", path.display());
75
76 match ArchiveInspector::inspect_zip(&path) {
78 Ok(metadata) => {
79 let archive_info = ArchiveInfo {
80 path: path.clone(),
81 metrics: metadata.metrics,
82 station_count: metadata.station_count,
83 file_count: metadata.total_files,
84 };
85
86 println!(
87 " → Found {} metrics across {} stations in {} files",
88 archive_info.metrics.len(),
89 archive_info.station_count,
90 archive_info.file_count
91 );
92
93 archives.push(archive_info);
94 }
95 Err(e) => {
96 println!(" → Warning: Failed to inspect {}: {}", path.display(), e);
97 continue;
98 }
99 }
100 }
101
102 if archives.is_empty() {
103 return Err(ProcessingError::InvalidFormat(format!(
104 "No valid zip files found in directory: {}",
105 dir_path.display()
106 )));
107 }
108
109 archives.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
111
112 println!("\nFound {} archives to process:", archives.len());
113 for archive in &archives {
114 println!(
115 " • {} ({} metrics)",
116 archive.path.file_name().unwrap().to_string_lossy(),
117 archive.metrics.len()
118 );
119 }
120
121 Ok(Self {
122 archives,
123 max_workers,
124 })
125 }
126
127 pub fn get_summary(&self) -> String {
129 let total_files = self.archives.iter().map(|a| a.file_count).sum::<usize>();
130 let total_metrics: Vec<_> = self.archives.iter().flat_map(|a| &a.metrics).collect();
131 let unique_metrics: std::collections::HashSet<_> = total_metrics.iter().collect();
132
133 let mut summary = format!(
134 "Multi-Archive Summary:\n Archives: {}\n Total Files: {}\n Unique Metrics: {}\n",
135 self.archives.len(),
136 total_files,
137 unique_metrics.len()
138 );
139
140 summary.push_str(" Available Metrics:\n");
141 for metric in unique_metrics {
142 let archive_count = self
143 .archives
144 .iter()
145 .filter(|a| a.metrics.contains(metric))
146 .count();
147 summary.push_str(&format!(" {}: {} archives\n", metric, archive_count));
148 }
149
150 summary
151 }
152
153 pub async fn process_unified_data(
155 mut self,
156 station_filter: Option<u32>,
157 ) -> Result<(Vec<WeatherRecord>, IntegrityReport, DatasetComposition)> {
158 println!(
159 "Processing {} archives with up to {} workers...",
160 self.archives.len(),
161 self.max_workers
162 );
163
164 let mut join_set = JoinSet::new();
166 let archives = std::mem::take(&mut self.archives);
167
168 for archive_info in archives {
169 join_set.spawn(async move {
170 println!("Starting processing: {}", archive_info.path.display());
171
172 let processor = ArchiveProcessor::from_zip(&archive_info.path).await?;
173 let (records, report) = processor.process_data(&archive_info.path).await?;
174
175 let filtered_records = if let Some(station_id) = station_filter {
177 records
178 .into_iter()
179 .filter(|r| r.station_id == station_id)
180 .collect()
181 } else {
182 records
183 };
184
185 println!(
186 "Completed processing: {} ({} records)",
187 archive_info.path.file_name().unwrap().to_string_lossy(),
188 filtered_records.len()
189 );
190
191 Ok::<(Vec<WeatherRecord>, IntegrityReport), ProcessingError>((
192 filtered_records,
193 report,
194 ))
195 });
196 }
197
198 let mut all_records_by_archive = Vec::new();
200 let mut all_reports = Vec::new();
201
202 while let Some(result) = join_set.join_next().await {
203 match result {
204 Ok(Ok((records, report))) => {
205 all_records_by_archive.push(records);
206 all_reports.push(report);
207 }
208 Ok(Err(e)) => return Err(e),
209 Err(e) => return Err(ProcessingError::TaskJoin(e)),
210 }
211 }
212
213 println!("All archives processed. Merging unified records...");
214
215 let (unified_records, composition) = self.merge_records_by_key(all_records_by_archive)?;
217
218 println!("Created {} unified weather records", unified_records.len());
219
220 let combined_report = self.combine_integrity_reports(all_reports);
222
223 Ok((unified_records, combined_report, composition))
224 }
225
226 fn merge_records_by_key(
228 &self,
229 records_by_archive: Vec<Vec<WeatherRecord>>,
230 ) -> Result<(Vec<WeatherRecord>, DatasetComposition)> {
231 let mut record_map: HashMap<(u32, NaiveDate), WeatherRecord> = HashMap::new();
232
233 for archive_records in records_by_archive {
234 for record in archive_records {
235 let key = (record.station_id, record.date);
236
237 match record_map.get_mut(&key) {
238 Some(existing) => {
239 Self::merge_weather_records(existing, record)?;
241 }
242 None => {
243 record_map.insert(key, record);
245 }
246 }
247 }
248 }
249
250 let mut unified_records: Vec<_> = record_map.into_values().collect();
252
253 for record in &mut unified_records {
255 record.perform_physical_validation();
256 }
257
258 unified_records.sort_by(|a, b| {
259 a.station_id
260 .cmp(&b.station_id)
261 .then_with(|| a.date.cmp(&b.date))
262 });
263
264 let total_records = unified_records.len();
266 let records_with_temperature = unified_records
267 .iter()
268 .filter(|r| r.has_temperature_data())
269 .count();
270 let records_with_precipitation = unified_records
271 .iter()
272 .filter(|r| r.has_precipitation())
273 .count();
274 let records_with_wind_speed = unified_records
275 .iter()
276 .filter(|r| r.has_wind_speed())
277 .count();
278
279 let mut available_metrics = Vec::new();
280 if records_with_temperature > 0 {
281 available_metrics.push("temperature".to_string());
282 }
283 if records_with_precipitation > 0 {
284 available_metrics.push("precipitation".to_string());
285 }
286 if records_with_wind_speed > 0 {
287 available_metrics.push("wind_speed".to_string());
288 }
289
290 let composition = DatasetComposition {
291 total_records,
292 records_with_temperature,
293 records_with_precipitation,
294 records_with_wind_speed,
295 available_metrics,
296 };
297
298 Ok((unified_records, composition))
299 }
300
301 fn merge_weather_records(target: &mut WeatherRecord, source: WeatherRecord) -> Result<()> {
303 if target.station_id != source.station_id || target.date != source.date {
305 return Err(ProcessingError::InvalidFormat(format!(
306 "Cannot merge records: station/date mismatch ({}/{} vs {}/{})",
307 target.station_id, target.date, source.station_id, source.date
308 )));
309 }
310
311 if source.temp_min.is_some() {
313 target.temp_min = source.temp_min;
314 }
315 if source.temp_max.is_some() {
316 target.temp_max = source.temp_max;
317 }
318 if source.temp_avg.is_some() {
319 target.temp_avg = source.temp_avg;
320 }
321
322 if source.precipitation.is_some() {
324 target.precipitation = source.precipitation;
325 }
326
327 if source.wind_speed.is_some() {
329 target.wind_speed = source.wind_speed;
330 }
331
332 if source.temp_quality.is_some() {
334 target.temp_quality = source.temp_quality;
335 }
336 if source.precip_quality.is_some() {
337 target.precip_quality = source.precip_quality;
338 }
339 if source.wind_quality.is_some() {
340 target.wind_quality = source.wind_quality;
341 }
342
343 target.perform_physical_validation();
345
346 Ok(())
347 }
348
349 fn combine_integrity_reports(&self, reports: Vec<IntegrityReport>) -> IntegrityReport {
351 let mut combined = IntegrityReport {
352 total_records: 0,
353 valid_records: 0,
354 suspect_records: 0,
355 invalid_records: 0,
356 missing_data_records: 0,
357 temperature_violations: Vec::new(),
358 station_statistics: HashMap::new(),
359 };
360
361 for report in reports {
362 combined.total_records += report.total_records;
363 combined.valid_records += report.valid_records;
364 combined.suspect_records += report.suspect_records;
365 combined.invalid_records += report.invalid_records;
366 combined.missing_data_records += report.missing_data_records;
367
368 combined
370 .temperature_violations
371 .extend(report.temperature_violations);
372
373 for (station_id, stats) in report.station_statistics {
375 combined.station_statistics.insert(station_id, stats);
376 }
377 }
378
379 combined
380 }
381
382 pub fn archive_paths(&self) -> Vec<&Path> {
384 self.archives.iter().map(|a| a.path.as_path()).collect()
385 }
386
387 pub fn archive_count(&self) -> usize {
389 self.archives.len()
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use std::fs::File;
397 use std::io::Write;
398 use tempfile::TempDir;
399
400 fn create_test_directory() -> Result<TempDir> {
401 let temp_dir = TempDir::new()?;
402
403 File::create(temp_dir.path().join("UK_ALL_TEMP_MIN.zip"))?;
405 File::create(temp_dir.path().join("UK_ALL_TEMP_MAX.zip"))?;
406 File::create(temp_dir.path().join("UK_ALL_PRECIPITATION.zip"))?;
407 File::create(temp_dir.path().join("OTHER_DATA.zip"))?;
408 File::create(temp_dir.path().join("not_a_zip.txt"))?;
409
410 Ok(temp_dir)
411 }
412
413 #[tokio::test]
414 async fn test_directory_scanning() {
415 let temp_dir = create_test_directory().unwrap();
416
417 let result =
419 MultiArchiveProcessor::from_directory(temp_dir.path(), Some("UK_ALL_"), 4).await;
420
421 assert!(result.is_err());
423 }
424
425 #[test]
426 fn test_merge_weather_records() {
427 use chrono::NaiveDate;
428
429 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
430
431 let mut target = WeatherRecord::new(
432 123,
433 "Test Station".to_string(),
434 date,
435 51.5,
436 -0.1,
437 Some(10.0), None, None, None, None, Some("0".to_string()),
443 None,
444 None,
445 );
446
447 let source = WeatherRecord::new(
448 123,
449 "Test Station".to_string(),
450 date,
451 51.5,
452 -0.1,
453 None, Some(20.0), Some(15.0), Some(5.5), None, None,
459 Some("0".to_string()),
460 None,
461 );
462
463 MultiArchiveProcessor::merge_weather_records(&mut target, source).unwrap();
464
465 assert_eq!(target.temp_min, Some(10.0));
466 assert_eq!(target.temp_max, Some(20.0));
467 assert_eq!(target.temp_avg, Some(15.0));
468 assert_eq!(target.precipitation, Some(5.5));
469 assert!(target.wind_speed.is_none());
470 }
471}