ecad_processor/cli/
commands.rs

1use crate::analyzers::WeatherAnalyzer;
2use crate::archive::{ArchiveProcessor, MultiArchiveProcessor};
3use crate::cli::args::{Cli, Commands};
4use crate::error::Result;
5use crate::processors::IntegrityChecker;
6use crate::utils::progress::ProgressReporter;
7use crate::utils::{generate_default_parquet_filename, generate_default_unified_parquet_filename};
8use crate::writers::{ParquetWriter, SchemaType};
9
10pub async fn run(cli: Cli) -> Result<()> {
11    // Initialize logging if verbose
12    if cli.verbose {
13        println!("Verbose logging enabled");
14    }
15
16    match cli.command {
17        Commands::Process {
18            input_archive,
19            output_file,
20            compression: _,
21            station_id,
22            validate_only,
23            max_workers,
24            chunk_size,
25        } => {
26            println!("Processing weather data from archive...");
27            println!("Input archive: {}", input_archive.display());
28
29            // Use default filename if not specified
30            let output_file = output_file.unwrap_or_else(generate_default_parquet_filename);
31
32            println!("Output file: {}", output_file.display());
33            println!("Workers: {}, Chunk size: {}", max_workers, chunk_size);
34
35            let progress = ProgressReporter::new_spinner("Inspecting archive...", false);
36
37            // Create archive processor
38            let processor = ArchiveProcessor::from_zip(&input_archive).await?;
39
40            // Display archive metadata
41            println!("\n{}", processor.metadata().display_summary());
42
43            progress.set_message("Processing data...");
44
45            // Process data
46            let (records, integrity_report) = processor.process_data(&input_archive).await?;
47
48            progress.finish_with_message(&format!("Processed {} records", records.len()));
49
50            // Print integrity report
51            let checker = IntegrityChecker::new();
52            println!("\n{}", checker.generate_summary(&integrity_report));
53
54            if validate_only {
55                println!("Validation complete - no output file written");
56                return Ok(());
57            }
58
59            // Filter by station if specified
60            let filtered_records = if let Some(id) = station_id {
61                records.into_iter().filter(|r| r.station_id == id).collect()
62            } else {
63                records
64            };
65
66            if filtered_records.is_empty() {
67                println!("No records to write");
68                return Ok(());
69            }
70
71            // Write to Parquet
72            println!(
73                "Writing {} records to Parquet file...",
74                filtered_records.len()
75            );
76
77            // Create parent directory if it doesn't exist
78            if let Some(parent) = output_file.parent() {
79                std::fs::create_dir_all(parent)?;
80            }
81
82            let writer = ParquetWriter::new();
83            writer.write_weather_records_batched(&filtered_records, &output_file, 10000)?;
84
85            println!(
86                "Successfully wrote {} weather records to {}",
87                filtered_records.len(),
88                output_file.display()
89            );
90
91            if !filtered_records.is_empty() {
92                let sample_record = &filtered_records[0];
93                println!(
94                    "Sample record: Station {} on {}",
95                    sample_record.station_id, sample_record.date
96                );
97                println!("Available metrics: {:?}", sample_record.available_metrics());
98            }
99
100            println!("Processing complete!");
101        }
102
103        Commands::ProcessDirectory {
104            input_dir,
105            output_file,
106            compression: _compression,
107            station_id,
108            validate_only,
109            max_workers: _max_workers,
110            chunk_size: _chunk_size,
111            file_pattern,
112        } => {
113            println!("Processing weather data from directory...");
114            println!("Input directory: {}", input_dir.display());
115
116            // Use default unified filename if not specified
117            let output_file = output_file.unwrap_or_else(generate_default_unified_parquet_filename);
118
119            println!("Output file: {}", output_file.display());
120
121            if !file_pattern.is_empty() {
122                println!("File pattern filter: '{}'", file_pattern);
123            }
124
125            let progress =
126                ProgressReporter::new_spinner("Scanning directory for archives...", false);
127
128            // Create multi-archive processor
129            let pattern = if file_pattern.is_empty() {
130                None
131            } else {
132                Some(file_pattern.as_str())
133            };
134            let processor = MultiArchiveProcessor::from_directory(&input_dir, pattern, 4).await?;
135
136            // Display archive summary
137            println!("\n{}", processor.get_summary());
138
139            progress.set_message("Processing all archives...");
140
141            // Process unified data
142            let (records, integrity_report, composition) =
143                processor.process_unified_data(station_id).await?;
144
145            progress.finish_with_message(&format!("Processed {} unified records", records.len()));
146
147            // Print integrity report
148            let checker = IntegrityChecker::new();
149            println!("\n{}", checker.generate_summary(&integrity_report));
150
151            if validate_only {
152                println!("Validation complete - no output file written");
153                return Ok(());
154            }
155
156            // Filter records if needed (already done in processor, but for consistency)
157            let filtered_records = records;
158
159            if filtered_records.is_empty() {
160                println!("No records to write");
161                return Ok(());
162            }
163
164            // Write to Parquet
165            println!(
166                "Writing {} unified records to Parquet file...",
167                filtered_records.len()
168            );
169
170            // Create parent directory if it doesn't exist
171            if let Some(parent) = output_file.parent() {
172                std::fs::create_dir_all(parent)?;
173            }
174
175            let writer = ParquetWriter::new();
176            writer.write_weather_records_batched(&filtered_records, &output_file, 10000)?;
177
178            println!(
179                "Successfully wrote {} unified weather records to {}",
180                filtered_records.len(),
181                output_file.display()
182            );
183
184            // Display dataset composition based on actual data
185            println!("Dataset Composition:");
186            println!("  Metrics in Parquet: {:?}", composition.available_metrics);
187            println!("  Total records: {}", composition.total_records);
188
189            println!("Metric Coverage:");
190            if composition.records_with_temperature > 0 {
191                println!(
192                    "  Temperature: {}/{} ({:.1}%)",
193                    composition.records_with_temperature,
194                    composition.total_records,
195                    composition.records_with_temperature as f32 / composition.total_records as f32
196                        * 100.0
197                );
198            }
199            if composition.records_with_precipitation > 0 {
200                println!(
201                    "  Precipitation: {}/{} ({:.1}%)",
202                    composition.records_with_precipitation,
203                    composition.total_records,
204                    composition.records_with_precipitation as f32
205                        / composition.total_records as f32
206                        * 100.0
207                );
208            }
209            if composition.records_with_wind_speed > 0 {
210                println!(
211                    "  Wind Speed: {}/{} ({:.1}%)",
212                    composition.records_with_wind_speed,
213                    composition.total_records,
214                    composition.records_with_wind_speed as f32 / composition.total_records as f32
215                        * 100.0
216                );
217            }
218
219            println!("Unified processing complete!");
220        }
221
222        Commands::Validate {
223            input_archive,
224            max_workers: _,
225        } => {
226            println!("Validating weather data from archive...");
227            println!("Input archive: {}", input_archive.display());
228
229            let progress = ProgressReporter::new_spinner("Inspecting archive...", false);
230
231            // Create archive processor
232            let processor = ArchiveProcessor::from_zip(&input_archive).await?;
233
234            // Display archive metadata
235            println!("\n{}", processor.metadata().display_summary());
236
237            progress.set_message("Validating data...");
238
239            let (_records, integrity_report) = processor.process_data(&input_archive).await?;
240
241            progress.finish_with_message("Validation complete");
242
243            let checker = IntegrityChecker::new();
244            println!("\n{}", checker.generate_summary(&integrity_report));
245
246            if integrity_report.temperature_violations.is_empty() {
247                println!("✅ All data passed validation checks");
248            } else {
249                println!(
250                    "⚠️  Found {} validation issues",
251                    integrity_report.temperature_violations.len()
252                );
253            }
254        }
255
256        Commands::Info {
257            file,
258            sample,
259            analysis_limit,
260        } => {
261            println!("Analyzing Parquet file: {}", file.display());
262
263            // Get basic file info
264            let writer = ParquetWriter::new();
265            let file_info = writer.get_file_info(&file)?;
266
267            // Detect schema type
268            let schema_type = writer.detect_schema_type(&file)?;
269            println!("Schema Type: {:?}", schema_type);
270
271            // Show file info
272            println!("\nFile Details:");
273            println!("{}", file_info.summary());
274
275            // Handle analysis based on schema type
276            match schema_type {
277                SchemaType::ConsolidatedRecord => {
278                    // Use old analyzer for consolidated records
279                    let analyzer = WeatherAnalyzer::new();
280                    let weather_stats =
281                        analyzer.analyze_parquet_with_limit(&file, analysis_limit)?;
282                    println!("\n{}", weather_stats.detailed_summary());
283
284                    // Show sample data if requested
285                    if sample > 0 {
286                        println!("\nSample Records (showing {} records):", sample);
287                        match writer.read_sample_records(&file, sample) {
288                            Ok(records) => {
289                                for (i, record) in records.iter().take(sample).enumerate() {
290                                    println!(
291                                        "{}. {} on {}: min={:.1}°C, avg={:.1}°C, max={:.1}°C ({})",
292                                        i + 1,
293                                        record.station_name,
294                                        record.date,
295                                        record.min_temp,
296                                        record.avg_temp,
297                                        record.max_temp,
298                                        record.quality_flags
299                                    );
300                                }
301                            }
302                            Err(e) => println!("Error reading sample data: {}", e),
303                        }
304                    }
305                }
306                SchemaType::WeatherRecord => {
307                    // Use comprehensive weather dataset analysis
308                    match writer.analyze_weather_dataset(&file, sample) {
309                        Ok(dataset_summary) => {
310                            println!("{}", dataset_summary.display_comprehensive_summary());
311                        }
312                        Err(e) => {
313                            println!("Error analyzing weather dataset: {}", e);
314
315                            // Fallback to basic sample display
316                            if sample > 0 {
317                                println!(
318                                    "\nFallback: Sample Weather Records (showing {} records):",
319                                    sample
320                                );
321                                match writer.read_sample_weather_records(&file, sample) {
322                                    Ok(records) => {
323                                        for (i, record) in records.iter().take(sample).enumerate() {
324                                            let mut metrics = Vec::new();
325
326                                            // Build temperature display
327                                            let temp_parts: Vec<String> = [
328                                                record.temp_min.map(|t| format!("min={:.1}°C", t)),
329                                                record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
330                                                record.temp_max.map(|t| format!("max={:.1}°C", t)),
331                                            ]
332                                            .into_iter()
333                                            .flatten()
334                                            .collect();
335
336                                            if !temp_parts.is_empty() {
337                                                metrics.push(format!(
338                                                    "temp({})",
339                                                    temp_parts.join(", ")
340                                                ));
341                                            }
342
343                                            if let Some(precip) = record.precipitation {
344                                                metrics.push(format!("precip={:.1}mm", precip));
345                                            }
346
347                                            if let Some(wind) = record.wind_speed {
348                                                metrics.push(format!("wind={:.1}m/s", wind));
349                                            }
350
351                                            let metrics_str = if metrics.is_empty() {
352                                                "no data".to_string()
353                                            } else {
354                                                metrics.join(", ")
355                                            };
356
357                                            println!(
358                                                "{}. {} on {}: {}",
359                                                i + 1,
360                                                record.station_name,
361                                                record.date,
362                                                metrics_str
363                                            );
364                                        }
365                                    }
366                                    Err(e) => println!("Error reading sample data: {}", e),
367                                }
368                            }
369                        }
370                    }
371                }
372                SchemaType::Unknown => {
373                    println!("\nUnknown schema type. Cannot analyze this Parquet file format.");
374                }
375            }
376        }
377    }
378
379    Ok(())
380}