1use crate::analyzers::WeatherAnalyzer;
2use crate::archive::{ArchiveProcessor, MultiArchiveProcessor};
3use crate::cli::args::{Cli, Commands};
4use crate::error::Result;
5use crate::processors::IntegrityChecker;
6use crate::utils::progress::ProgressReporter;
7use crate::utils::{generate_default_parquet_filename, generate_default_unified_parquet_filename};
8use crate::writers::{ParquetWriter, SchemaType};
9
10pub async fn run(cli: Cli) -> Result<()> {
11 if cli.verbose {
13 println!("Verbose logging enabled");
14 }
15
16 match cli.command {
17 Commands::Process {
18 input_archive,
19 output_file,
20 compression: _,
21 station_id,
22 validate_only,
23 max_workers,
24 chunk_size,
25 } => {
26 println!("Processing weather data from archive...");
27 println!("Input archive: {}", input_archive.display());
28
29 let output_file = output_file.unwrap_or_else(generate_default_parquet_filename);
31
32 println!("Output file: {}", output_file.display());
33 println!("Workers: {}, Chunk size: {}", max_workers, chunk_size);
34
35 let progress = ProgressReporter::new_spinner("Inspecting archive...", false);
36
37 let processor = ArchiveProcessor::from_zip(&input_archive).await?;
39
40 println!("\n{}", processor.metadata().display_summary());
42
43 progress.set_message("Processing data...");
44
45 let (records, integrity_report) = processor.process_data(&input_archive).await?;
47
48 progress.finish_with_message(&format!("Processed {} records", records.len()));
49
50 let checker = IntegrityChecker::new();
52 println!("\n{}", checker.generate_summary(&integrity_report));
53
54 if validate_only {
55 println!("Validation complete - no output file written");
56 return Ok(());
57 }
58
59 let filtered_records = if let Some(id) = station_id {
61 records.into_iter().filter(|r| r.station_id == id).collect()
62 } else {
63 records
64 };
65
66 if filtered_records.is_empty() {
67 println!("No records to write");
68 return Ok(());
69 }
70
71 println!(
73 "Writing {} records to Parquet file...",
74 filtered_records.len()
75 );
76
77 if let Some(parent) = output_file.parent() {
79 std::fs::create_dir_all(parent)?;
80 }
81
82 let writer = ParquetWriter::new();
83 writer.write_weather_records_batched(&filtered_records, &output_file, 10000)?;
84
85 println!(
86 "Successfully wrote {} weather records to {}",
87 filtered_records.len(),
88 output_file.display()
89 );
90
91 if !filtered_records.is_empty() {
92 let sample_record = &filtered_records[0];
93 println!(
94 "Sample record: Station {} on {}",
95 sample_record.station_id, sample_record.date
96 );
97 println!("Available metrics: {:?}", sample_record.available_metrics());
98 }
99
100 println!("Processing complete!");
101 }
102
103 Commands::ProcessDirectory {
104 input_dir,
105 output_file,
106 compression: _compression,
107 station_id,
108 validate_only,
109 max_workers: _max_workers,
110 chunk_size: _chunk_size,
111 file_pattern,
112 } => {
113 println!("Processing weather data from directory...");
114 println!("Input directory: {}", input_dir.display());
115
116 let output_file = output_file.unwrap_or_else(generate_default_unified_parquet_filename);
118
119 println!("Output file: {}", output_file.display());
120
121 if !file_pattern.is_empty() {
122 println!("File pattern filter: '{}'", file_pattern);
123 }
124
125 let progress =
126 ProgressReporter::new_spinner("Scanning directory for archives...", false);
127
128 let pattern = if file_pattern.is_empty() {
130 None
131 } else {
132 Some(file_pattern.as_str())
133 };
134 let processor = MultiArchiveProcessor::from_directory(&input_dir, pattern, 4).await?;
135
136 println!("\n{}", processor.get_summary());
138
139 progress.set_message("Processing all archives...");
140
141 let (records, integrity_report, composition) =
143 processor.process_unified_data(station_id).await?;
144
145 progress.finish_with_message(&format!("Processed {} unified records", records.len()));
146
147 let checker = IntegrityChecker::new();
149 println!("\n{}", checker.generate_summary(&integrity_report));
150
151 if validate_only {
152 println!("Validation complete - no output file written");
153 return Ok(());
154 }
155
156 let filtered_records = records;
158
159 if filtered_records.is_empty() {
160 println!("No records to write");
161 return Ok(());
162 }
163
164 println!(
166 "Writing {} unified records to Parquet file...",
167 filtered_records.len()
168 );
169
170 if let Some(parent) = output_file.parent() {
172 std::fs::create_dir_all(parent)?;
173 }
174
175 let writer = ParquetWriter::new();
176 writer.write_weather_records_batched(&filtered_records, &output_file, 10000)?;
177
178 println!(
179 "Successfully wrote {} unified weather records to {}",
180 filtered_records.len(),
181 output_file.display()
182 );
183
184 println!("Dataset Composition:");
186 println!(" Metrics in Parquet: {:?}", composition.available_metrics);
187 println!(" Total records: {}", composition.total_records);
188
189 println!("Metric Coverage:");
190 if composition.records_with_temperature > 0 {
191 println!(
192 " Temperature: {}/{} ({:.1}%)",
193 composition.records_with_temperature,
194 composition.total_records,
195 composition.records_with_temperature as f32 / composition.total_records as f32
196 * 100.0
197 );
198 }
199 if composition.records_with_precipitation > 0 {
200 println!(
201 " Precipitation: {}/{} ({:.1}%)",
202 composition.records_with_precipitation,
203 composition.total_records,
204 composition.records_with_precipitation as f32
205 / composition.total_records as f32
206 * 100.0
207 );
208 }
209 if composition.records_with_wind_speed > 0 {
210 println!(
211 " Wind Speed: {}/{} ({:.1}%)",
212 composition.records_with_wind_speed,
213 composition.total_records,
214 composition.records_with_wind_speed as f32 / composition.total_records as f32
215 * 100.0
216 );
217 }
218
219 println!("Unified processing complete!");
220 }
221
222 Commands::Validate {
223 input_archive,
224 max_workers: _,
225 } => {
226 println!("Validating weather data from archive...");
227 println!("Input archive: {}", input_archive.display());
228
229 let progress = ProgressReporter::new_spinner("Inspecting archive...", false);
230
231 let processor = ArchiveProcessor::from_zip(&input_archive).await?;
233
234 println!("\n{}", processor.metadata().display_summary());
236
237 progress.set_message("Validating data...");
238
239 let (_records, integrity_report) = processor.process_data(&input_archive).await?;
240
241 progress.finish_with_message("Validation complete");
242
243 let checker = IntegrityChecker::new();
244 println!("\n{}", checker.generate_summary(&integrity_report));
245
246 if integrity_report.temperature_violations.is_empty() {
247 println!("✅ All data passed validation checks");
248 } else {
249 println!(
250 "⚠️ Found {} validation issues",
251 integrity_report.temperature_violations.len()
252 );
253 }
254 }
255
256 Commands::Info {
257 file,
258 sample,
259 analysis_limit,
260 } => {
261 println!("Analyzing Parquet file: {}", file.display());
262
263 let writer = ParquetWriter::new();
265 let file_info = writer.get_file_info(&file)?;
266
267 let schema_type = writer.detect_schema_type(&file)?;
269 println!("Schema Type: {:?}", schema_type);
270
271 println!("\nFile Details:");
273 println!("{}", file_info.summary());
274
275 match schema_type {
277 SchemaType::ConsolidatedRecord => {
278 let analyzer = WeatherAnalyzer::new();
280 let weather_stats =
281 analyzer.analyze_parquet_with_limit(&file, analysis_limit)?;
282 println!("\n{}", weather_stats.detailed_summary());
283
284 if sample > 0 {
286 println!("\nSample Records (showing {} records):", sample);
287 match writer.read_sample_records(&file, sample) {
288 Ok(records) => {
289 for (i, record) in records.iter().take(sample).enumerate() {
290 println!(
291 "{}. {} on {}: min={:.1}°C, avg={:.1}°C, max={:.1}°C ({})",
292 i + 1,
293 record.station_name,
294 record.date,
295 record.min_temp,
296 record.avg_temp,
297 record.max_temp,
298 record.quality_flags
299 );
300 }
301 }
302 Err(e) => println!("Error reading sample data: {}", e),
303 }
304 }
305 }
306 SchemaType::WeatherRecord => {
307 match writer.analyze_weather_dataset(&file, sample) {
309 Ok(dataset_summary) => {
310 println!("{}", dataset_summary.display_comprehensive_summary());
311 }
312 Err(e) => {
313 println!("Error analyzing weather dataset: {}", e);
314
315 if sample > 0 {
317 println!(
318 "\nFallback: Sample Weather Records (showing {} records):",
319 sample
320 );
321 match writer.read_sample_weather_records(&file, sample) {
322 Ok(records) => {
323 for (i, record) in records.iter().take(sample).enumerate() {
324 let mut metrics = Vec::new();
325
326 let temp_parts: Vec<String> = [
328 record.temp_min.map(|t| format!("min={:.1}°C", t)),
329 record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
330 record.temp_max.map(|t| format!("max={:.1}°C", t)),
331 ]
332 .into_iter()
333 .flatten()
334 .collect();
335
336 if !temp_parts.is_empty() {
337 metrics.push(format!(
338 "temp({})",
339 temp_parts.join(", ")
340 ));
341 }
342
343 if let Some(precip) = record.precipitation {
344 metrics.push(format!("precip={:.1}mm", precip));
345 }
346
347 if let Some(wind) = record.wind_speed {
348 metrics.push(format!("wind={:.1}m/s", wind));
349 }
350
351 let metrics_str = if metrics.is_empty() {
352 "no data".to_string()
353 } else {
354 metrics.join(", ")
355 };
356
357 println!(
358 "{}. {} on {}: {}",
359 i + 1,
360 record.station_name,
361 record.date,
362 metrics_str
363 );
364 }
365 }
366 Err(e) => println!("Error reading sample data: {}", e),
367 }
368 }
369 }
370 }
371 }
372 SchemaType::Unknown => {
373 println!("\nUnknown schema type. Cannot analyze this Parquet file format.");
374 }
375 }
376 }
377 }
378
379 Ok(())
380}