ecad_processor/archive/
multi_processor.rs

1use crate::archive::{ArchiveInspector, ArchiveProcessor, WeatherMetric};
2use crate::error::{ProcessingError, Result};
3use crate::models::WeatherRecord;
4use crate::processors::IntegrityReport;
5use chrono::NaiveDate;
6use std::collections::HashMap;
7use std::fs;
8use std::path::{Path, PathBuf};
9use tokio::task::JoinSet;
10
11#[derive(Debug, Clone)]
12pub struct ArchiveInfo {
13    pub path: PathBuf,
14    pub metrics: Vec<WeatherMetric>,
15    pub station_count: usize,
16    pub file_count: usize,
17}
18
19#[derive(Debug, Clone)]
20pub struct DatasetComposition {
21    pub total_records: usize,
22    pub records_with_temperature: usize,
23    pub records_with_precipitation: usize,
24    pub records_with_wind_speed: usize,
25    pub available_metrics: Vec<String>,
26}
27
28pub struct MultiArchiveProcessor {
29    archives: Vec<ArchiveInfo>,
30    max_workers: usize,
31}
32
33impl MultiArchiveProcessor {
34    /// Create a new processor by scanning a directory for zip files
35    pub async fn from_directory(
36        dir_path: &Path,
37        file_pattern: Option<&str>,
38        max_workers: usize,
39    ) -> Result<Self> {
40        if !dir_path.is_dir() {
41            return Err(ProcessingError::InvalidFormat(format!(
42                "Path is not a directory: {}",
43                dir_path.display()
44            )));
45        }
46
47        let mut archives = Vec::new();
48
49        // Read directory entries
50        let entries = fs::read_dir(dir_path)?;
51
52        for entry in entries {
53            let entry = entry?;
54            let path = entry.path();
55
56            // Filter for zip files
57            if !path.is_file() || path.extension().map_or(true, |ext| ext != "zip") {
58                continue;
59            }
60
61            // Apply file pattern filter if specified
62            if let Some(pattern) = file_pattern {
63                if !pattern.is_empty() {
64                    if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
65                        if !filename.contains(pattern) {
66                            continue;
67                        }
68                    } else {
69                        continue;
70                    }
71                }
72            }
73
74            println!("Inspecting archive: {}", path.display());
75
76            // Inspect the archive to get metadata
77            match ArchiveInspector::inspect_zip(&path) {
78                Ok(metadata) => {
79                    let archive_info = ArchiveInfo {
80                        path: path.clone(),
81                        metrics: metadata.metrics,
82                        station_count: metadata.station_count,
83                        file_count: metadata.total_files,
84                    };
85
86                    println!(
87                        "  → Found {} metrics across {} stations in {} files",
88                        archive_info.metrics.len(),
89                        archive_info.station_count,
90                        archive_info.file_count
91                    );
92
93                    archives.push(archive_info);
94                }
95                Err(e) => {
96                    println!("  → Warning: Failed to inspect {}: {}", path.display(), e);
97                    continue;
98                }
99            }
100        }
101
102        if archives.is_empty() {
103            return Err(ProcessingError::InvalidFormat(format!(
104                "No valid zip files found in directory: {}",
105                dir_path.display()
106            )));
107        }
108
109        // Sort archives by filename for consistent processing order
110        archives.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
111
112        println!("\nFound {} archives to process:", archives.len());
113        for archive in &archives {
114            println!(
115                "  • {} ({} metrics)",
116                archive.path.file_name().unwrap().to_string_lossy(),
117                archive.metrics.len()
118            );
119        }
120
121        Ok(Self {
122            archives,
123            max_workers,
124        })
125    }
126
127    /// Get summary of all discovered archives
128    pub fn get_summary(&self) -> String {
129        let total_files = self.archives.iter().map(|a| a.file_count).sum::<usize>();
130        let total_metrics: Vec<_> = self.archives.iter().flat_map(|a| &a.metrics).collect();
131        let unique_metrics: std::collections::HashSet<_> = total_metrics.iter().collect();
132
133        let mut summary = format!(
134            "Multi-Archive Summary:\n  Archives: {}\n  Total Files: {}\n  Unique Metrics: {}\n",
135            self.archives.len(),
136            total_files,
137            unique_metrics.len()
138        );
139
140        summary.push_str("  Available Metrics:\n");
141        for metric in unique_metrics {
142            let archive_count = self
143                .archives
144                .iter()
145                .filter(|a| a.metrics.contains(metric))
146                .count();
147            summary.push_str(&format!("    {}: {} archives\n", metric, archive_count));
148        }
149
150        summary
151    }
152
153    /// Process all archives and merge data into unified records
154    pub async fn process_unified_data(
155        mut self,
156        station_filter: Option<u32>,
157    ) -> Result<(Vec<WeatherRecord>, IntegrityReport, DatasetComposition)> {
158        println!(
159            "Processing {} archives with up to {} workers...",
160            self.archives.len(),
161            self.max_workers
162        );
163
164        // Process archives concurrently
165        let mut join_set = JoinSet::new();
166        let archives = std::mem::take(&mut self.archives);
167
168        for archive_info in archives {
169            join_set.spawn(async move {
170                println!("Starting processing: {}", archive_info.path.display());
171
172                let processor = ArchiveProcessor::from_zip(&archive_info.path).await?;
173                let (records, report) = processor.process_data(&archive_info.path).await?;
174
175                // Filter by station if specified
176                let filtered_records = if let Some(station_id) = station_filter {
177                    records
178                        .into_iter()
179                        .filter(|r| r.station_id == station_id)
180                        .collect()
181                } else {
182                    records
183                };
184
185                println!(
186                    "Completed processing: {} ({} records)",
187                    archive_info.path.file_name().unwrap().to_string_lossy(),
188                    filtered_records.len()
189                );
190
191                Ok::<(Vec<WeatherRecord>, IntegrityReport), ProcessingError>((
192                    filtered_records,
193                    report,
194                ))
195            });
196        }
197
198        // Collect all results
199        let mut all_records_by_archive = Vec::new();
200        let mut all_reports = Vec::new();
201
202        while let Some(result) = join_set.join_next().await {
203            match result {
204                Ok(Ok((records, report))) => {
205                    all_records_by_archive.push(records);
206                    all_reports.push(report);
207                }
208                Ok(Err(e)) => return Err(e),
209                Err(e) => return Err(ProcessingError::TaskJoin(e)),
210            }
211        }
212
213        println!("All archives processed. Merging unified records...");
214
215        // Merge records by station and date
216        let (unified_records, composition) = self.merge_records_by_key(all_records_by_archive)?;
217
218        println!("Created {} unified weather records", unified_records.len());
219
220        // Combine integrity reports
221        let combined_report = self.combine_integrity_reports(all_reports);
222
223        Ok((unified_records, combined_report, composition))
224    }
225
226    /// Merge records from multiple archives by (station_id, date) key
227    fn merge_records_by_key(
228        &self,
229        records_by_archive: Vec<Vec<WeatherRecord>>,
230    ) -> Result<(Vec<WeatherRecord>, DatasetComposition)> {
231        let mut record_map: HashMap<(u32, NaiveDate), WeatherRecord> = HashMap::new();
232
233        for archive_records in records_by_archive {
234            for record in archive_records {
235                let key = (record.station_id, record.date);
236
237                match record_map.get_mut(&key) {
238                    Some(existing) => {
239                        // Merge with existing record
240                        Self::merge_weather_records(existing, record)?;
241                    }
242                    None => {
243                        // Add new record
244                        record_map.insert(key, record);
245                    }
246                }
247            }
248        }
249
250        // Convert to vector and ensure all records have physical validation
251        let mut unified_records: Vec<_> = record_map.into_values().collect();
252
253        // Ensure all records have physical validation performed
254        for record in &mut unified_records {
255            record.perform_physical_validation();
256        }
257
258        unified_records.sort_by(|a, b| {
259            a.station_id
260                .cmp(&b.station_id)
261                .then_with(|| a.date.cmp(&b.date))
262        });
263
264        // Calculate dataset composition
265        let total_records = unified_records.len();
266        let records_with_temperature = unified_records
267            .iter()
268            .filter(|r| r.has_temperature_data())
269            .count();
270        let records_with_precipitation = unified_records
271            .iter()
272            .filter(|r| r.has_precipitation())
273            .count();
274        let records_with_wind_speed = unified_records
275            .iter()
276            .filter(|r| r.has_wind_speed())
277            .count();
278
279        let mut available_metrics = Vec::new();
280        if records_with_temperature > 0 {
281            available_metrics.push("temperature".to_string());
282        }
283        if records_with_precipitation > 0 {
284            available_metrics.push("precipitation".to_string());
285        }
286        if records_with_wind_speed > 0 {
287            available_metrics.push("wind_speed".to_string());
288        }
289
290        let composition = DatasetComposition {
291            total_records,
292            records_with_temperature,
293            records_with_precipitation,
294            records_with_wind_speed,
295            available_metrics,
296        };
297
298        Ok((unified_records, composition))
299    }
300
301    /// Merge data from one weather record into another
302    fn merge_weather_records(target: &mut WeatherRecord, source: WeatherRecord) -> Result<()> {
303        // Verify records are for same station and date
304        if target.station_id != source.station_id || target.date != source.date {
305            return Err(ProcessingError::InvalidFormat(format!(
306                "Cannot merge records: station/date mismatch ({}/{} vs {}/{})",
307                target.station_id, target.date, source.station_id, source.date
308            )));
309        }
310
311        // Merge temperature data (prefer non-null values)
312        if source.temp_min.is_some() {
313            target.temp_min = source.temp_min;
314        }
315        if source.temp_max.is_some() {
316            target.temp_max = source.temp_max;
317        }
318        if source.temp_avg.is_some() {
319            target.temp_avg = source.temp_avg;
320        }
321
322        // Merge precipitation data
323        if source.precipitation.is_some() {
324            target.precipitation = source.precipitation;
325        }
326
327        // Merge wind speed data
328        if source.wind_speed.is_some() {
329            target.wind_speed = source.wind_speed;
330        }
331
332        // Merge quality flags
333        if source.temp_quality.is_some() {
334            target.temp_quality = source.temp_quality;
335        }
336        if source.precip_quality.is_some() {
337            target.precip_quality = source.precip_quality;
338        }
339        if source.wind_quality.is_some() {
340            target.wind_quality = source.wind_quality;
341        }
342
343        // Re-run physical validation after merging data
344        target.perform_physical_validation();
345
346        Ok(())
347    }
348
349    /// Combine multiple integrity reports into one
350    fn combine_integrity_reports(&self, reports: Vec<IntegrityReport>) -> IntegrityReport {
351        let mut combined = IntegrityReport {
352            total_records: 0,
353            valid_records: 0,
354            suspect_records: 0,
355            invalid_records: 0,
356            missing_data_records: 0,
357            temperature_violations: Vec::new(),
358            station_statistics: HashMap::new(),
359        };
360
361        for report in reports {
362            combined.total_records += report.total_records;
363            combined.valid_records += report.valid_records;
364            combined.suspect_records += report.suspect_records;
365            combined.invalid_records += report.invalid_records;
366            combined.missing_data_records += report.missing_data_records;
367
368            // Merge violations
369            combined
370                .temperature_violations
371                .extend(report.temperature_violations);
372
373            // Merge station statistics (basic approach - could be more sophisticated)
374            for (station_id, stats) in report.station_statistics {
375                combined.station_statistics.insert(station_id, stats);
376            }
377        }
378
379        combined
380    }
381
382    /// Get list of archive paths
383    pub fn archive_paths(&self) -> Vec<&Path> {
384        self.archives.iter().map(|a| a.path.as_path()).collect()
385    }
386
387    /// Get total number of archives
388    pub fn archive_count(&self) -> usize {
389        self.archives.len()
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396    use std::fs::File;
397    use std::io::Write;
398    use tempfile::TempDir;
399
400    fn create_test_directory() -> Result<TempDir> {
401        let temp_dir = TempDir::new()?;
402
403        // Create some test zip files
404        File::create(temp_dir.path().join("UK_ALL_TEMP_MIN.zip"))?;
405        File::create(temp_dir.path().join("UK_ALL_TEMP_MAX.zip"))?;
406        File::create(temp_dir.path().join("UK_ALL_PRECIPITATION.zip"))?;
407        File::create(temp_dir.path().join("OTHER_DATA.zip"))?;
408        File::create(temp_dir.path().join("not_a_zip.txt"))?;
409
410        Ok(temp_dir)
411    }
412
413    #[tokio::test]
414    async fn test_directory_scanning() {
415        let temp_dir = create_test_directory().unwrap();
416
417        // This will fail because the zip files are empty, but we can test the scanning logic
418        let result =
419            MultiArchiveProcessor::from_directory(temp_dir.path(), Some("UK_ALL_"), 4).await;
420
421        // Should find 3 UK_ALL_ files and fail on inspection
422        assert!(result.is_err());
423    }
424
425    #[test]
426    fn test_merge_weather_records() {
427        use chrono::NaiveDate;
428
429        let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
430
431        let mut target = WeatherRecord::new(
432            123,
433            "Test Station".to_string(),
434            date,
435            51.5,
436            -0.1,
437            Some(10.0), // min temp
438            None,       // max temp
439            None,       // avg temp
440            None,       // precipitation
441            None,       // wind speed
442            Some("0".to_string()),
443            None,
444            None,
445        );
446
447        let source = WeatherRecord::new(
448            123,
449            "Test Station".to_string(),
450            date,
451            51.5,
452            -0.1,
453            None,       // min temp
454            Some(20.0), // max temp
455            Some(15.0), // avg temp
456            Some(5.5),  // precipitation
457            None,       // wind speed
458            None,
459            Some("0".to_string()),
460            None,
461        );
462
463        MultiArchiveProcessor::merge_weather_records(&mut target, source).unwrap();
464
465        assert_eq!(target.temp_min, Some(10.0));
466        assert_eq!(target.temp_max, Some(20.0));
467        assert_eq!(target.temp_avg, Some(15.0));
468        assert_eq!(target.precipitation, Some(5.5));
469        assert!(target.wind_speed.is_none());
470    }
471}