ecad_processor/writers/
parquet_writer.rs

1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, WeatherRecord};
3use crate::utils::constants::DEFAULT_ROW_GROUP_SIZE;
4use arrow::array::*;
5use arrow::datatypes::{DataType, Field, Schema};
6use arrow::record_batch::RecordBatch;
7use chrono::Datelike;
8use parquet::arrow::ArrowWriter;
9use parquet::basic::{Compression, GzipLevel};
10use parquet::file::properties::WriterProperties;
11use std::fs::File;
12use std::path::Path;
13use std::sync::Arc;
14
15pub struct ParquetWriter {
16    compression: Compression,
17    row_group_size: usize,
18}
19
20impl ParquetWriter {
21    pub fn new() -> Self {
22        Self {
23            compression: Compression::SNAPPY,
24            row_group_size: DEFAULT_ROW_GROUP_SIZE,
25        }
26    }
27
28    pub fn with_compression(mut self, compression: &str) -> Result<Self> {
29        self.compression = match compression.to_lowercase().as_str() {
30            "snappy" => Compression::SNAPPY,
31            "gzip" => Compression::GZIP(GzipLevel::default()),
32            "lz4" => Compression::LZ4,
33            "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
34            "none" => Compression::UNCOMPRESSED,
35            _ => {
36                return Err(crate::error::ProcessingError::Config(format!(
37                    "Unsupported compression: {}",
38                    compression
39                )))
40            }
41        };
42        Ok(self)
43    }
44
45    pub fn with_row_group_size(mut self, size: usize) -> Self {
46        self.row_group_size = size;
47        self
48    }
49
50    /// Write consolidated records to Parquet file
51    pub fn write_records(&self, records: &[ConsolidatedRecord], path: &Path) -> Result<()> {
52        if records.is_empty() {
53            return Ok(());
54        }
55
56        let schema = self.create_schema();
57        let batch = self.records_to_batch(records, schema.clone())?;
58
59        let file = File::create(path)?;
60        let props = WriterProperties::builder()
61            .set_compression(self.compression)
62            .set_max_row_group_size(self.row_group_size)
63            .build();
64
65        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
66        writer.write(&batch)?;
67        writer.close()?;
68
69        Ok(())
70    }
71
72    /// Write records in batches for memory efficiency
73    pub fn write_records_batched(
74        &self,
75        records: &[ConsolidatedRecord],
76        path: &Path,
77        batch_size: usize,
78    ) -> Result<()> {
79        if records.is_empty() {
80            return Ok(());
81        }
82
83        let schema = self.create_schema();
84        let file = File::create(path)?;
85        let props = WriterProperties::builder()
86            .set_compression(self.compression)
87            .set_max_row_group_size(self.row_group_size)
88            .build();
89
90        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
91
92        // Write in batches
93        for chunk in records.chunks(batch_size) {
94            let batch = self.records_to_batch(chunk, schema.clone())?;
95            writer.write(&batch)?;
96        }
97
98        writer.close()?;
99        Ok(())
100    }
101
102    /// Create Arrow schema for temperature data
103    fn create_schema(&self) -> Arc<Schema> {
104        let fields = vec![
105            Field::new("station_id", DataType::UInt32, false),
106            Field::new("station_name", DataType::Utf8, false),
107            Field::new("date", DataType::Date32, false),
108            Field::new("latitude", DataType::Float64, false),
109            Field::new("longitude", DataType::Float64, false),
110            Field::new("min_temp", DataType::Float32, false),
111            Field::new("max_temp", DataType::Float32, false),
112            Field::new("avg_temp", DataType::Float32, false),
113            Field::new("quality_flags", DataType::Utf8, false),
114        ];
115
116        Arc::new(Schema::new(fields))
117    }
118
119    /// Convert records to Arrow RecordBatch
120    fn records_to_batch(
121        &self,
122        records: &[ConsolidatedRecord],
123        schema: Arc<Schema>,
124    ) -> Result<RecordBatch> {
125        // Extract data into separate vectors
126        let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
127        let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
128        let dates: Vec<i32> = records.iter().map(|r| r.date.num_days_from_ce()).collect();
129        let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
130        let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
131        let min_temps: Vec<f32> = records.iter().map(|r| r.min_temp).collect();
132        let max_temps: Vec<f32> = records.iter().map(|r| r.max_temp).collect();
133        let avg_temps: Vec<f32> = records.iter().map(|r| r.avg_temp).collect();
134        let quality_flags: Vec<String> = records.iter().map(|r| r.quality_flags.clone()).collect();
135
136        // Create Arrow arrays
137        let station_id_array = Arc::new(UInt32Array::from(station_ids));
138        let station_name_array = Arc::new(StringArray::from(station_names));
139        let date_array = Arc::new(Date32Array::from(dates));
140        let latitude_array = Arc::new(Float64Array::from(latitudes));
141        let longitude_array = Arc::new(Float64Array::from(longitudes));
142        let min_temp_array = Arc::new(Float32Array::from(min_temps));
143        let max_temp_array = Arc::new(Float32Array::from(max_temps));
144        let avg_temp_array = Arc::new(Float32Array::from(avg_temps));
145        let quality_flags_array = Arc::new(StringArray::from(quality_flags));
146
147        // Create record batch
148        let batch = RecordBatch::try_new(
149            schema,
150            vec![
151                station_id_array,
152                station_name_array,
153                date_array,
154                latitude_array,
155                longitude_array,
156                min_temp_array,
157                max_temp_array,
158                avg_temp_array,
159                quality_flags_array,
160            ],
161        )?;
162
163        Ok(batch)
164    }
165
166    /// Read sample records from Parquet file
167    pub fn read_sample_records(
168        &self,
169        path: &Path,
170        limit: usize,
171    ) -> Result<Vec<ConsolidatedRecord>> {
172        use arrow::array::*;
173        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
174
175        let file = File::open(path)?;
176        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
177            .with_batch_size(limit.min(8192))
178            .build()?;
179
180        let mut records = Vec::new();
181        let mut total_read = 0;
182
183        for batch_result in parquet_reader {
184            let batch = batch_result?;
185
186            // Extract arrays from the batch
187            let station_ids = batch
188                .column(0)
189                .as_any()
190                .downcast_ref::<UInt32Array>()
191                .ok_or_else(|| {
192                    crate::error::ProcessingError::Config(
193                        "Invalid station_id column type".to_string(),
194                    )
195                })?;
196            let station_names = batch
197                .column(1)
198                .as_any()
199                .downcast_ref::<StringArray>()
200                .ok_or_else(|| {
201                    crate::error::ProcessingError::Config(
202                        "Invalid station_name column type".to_string(),
203                    )
204                })?;
205            let dates = batch
206                .column(2)
207                .as_any()
208                .downcast_ref::<Date32Array>()
209                .ok_or_else(|| {
210                    crate::error::ProcessingError::Config("Invalid date column type".to_string())
211                })?;
212            let latitudes = batch
213                .column(3)
214                .as_any()
215                .downcast_ref::<Float64Array>()
216                .ok_or_else(|| {
217                    crate::error::ProcessingError::Config(
218                        "Invalid latitude column type".to_string(),
219                    )
220                })?;
221            let longitudes = batch
222                .column(4)
223                .as_any()
224                .downcast_ref::<Float64Array>()
225                .ok_or_else(|| {
226                    crate::error::ProcessingError::Config(
227                        "Invalid longitude column type".to_string(),
228                    )
229                })?;
230            let min_temps = batch
231                .column(5)
232                .as_any()
233                .downcast_ref::<Float32Array>()
234                .ok_or_else(|| {
235                    crate::error::ProcessingError::Config(
236                        "Invalid min_temp column type".to_string(),
237                    )
238                })?;
239            let max_temps = batch
240                .column(6)
241                .as_any()
242                .downcast_ref::<Float32Array>()
243                .ok_or_else(|| {
244                    crate::error::ProcessingError::Config(
245                        "Invalid max_temp column type".to_string(),
246                    )
247                })?;
248            let avg_temps = batch
249                .column(7)
250                .as_any()
251                .downcast_ref::<Float32Array>()
252                .ok_or_else(|| {
253                    crate::error::ProcessingError::Config(
254                        "Invalid avg_temp column type".to_string(),
255                    )
256                })?;
257            let quality_flags = batch
258                .column(8)
259                .as_any()
260                .downcast_ref::<StringArray>()
261                .ok_or_else(|| {
262                    crate::error::ProcessingError::Config(
263                        "Invalid quality_flags column type".to_string(),
264                    )
265                })?;
266
267            // Convert to ConsolidatedRecord objects
268            let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
269
270            for i in 0..batch_records_to_read {
271                let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i))
272                    .ok_or_else(|| {
273                        crate::error::ProcessingError::Config(
274                            "Invalid date in Parquet file".to_string(),
275                        )
276                    })?;
277
278                let record = ConsolidatedRecord::new(
279                    station_ids.value(i),
280                    station_names.value(i).to_string(),
281                    date,
282                    latitudes.value(i),
283                    longitudes.value(i),
284                    min_temps.value(i),
285                    max_temps.value(i),
286                    avg_temps.value(i),
287                    quality_flags.value(i).to_string(),
288                );
289
290                records.push(record);
291                total_read += 1;
292
293                if total_read >= limit {
294                    break;
295                }
296            }
297
298            if total_read >= limit {
299                break;
300            }
301        }
302
303        Ok(records)
304    }
305
306    /// Get file statistics
307    pub fn get_file_info(&self, path: &Path) -> Result<ParquetFileInfo> {
308        use parquet::file::reader::{FileReader, SerializedFileReader};
309        use std::fs::File;
310
311        let file = File::open(path)?;
312        let reader = SerializedFileReader::new(file)?;
313        let metadata = reader.metadata();
314
315        let file_metadata = metadata.file_metadata();
316        let row_groups = metadata.num_row_groups();
317        let total_rows = file_metadata.num_rows();
318        let file_size = std::fs::metadata(path)?.len();
319
320        let mut row_group_sizes = Vec::new();
321        for i in 0..row_groups {
322            let rg_metadata = metadata.row_group(i);
323            row_group_sizes.push(rg_metadata.num_rows());
324        }
325
326        Ok(ParquetFileInfo {
327            total_rows,
328            row_groups: row_groups as i32,
329            row_group_sizes,
330            file_size,
331            compression: self.compression,
332        })
333    }
334
335    /// Write weather records to Parquet file with optional fields
336    pub fn write_weather_records(&self, records: &[WeatherRecord], path: &Path) -> Result<()> {
337        if records.is_empty() {
338            return Ok(());
339        }
340
341        let schema = self.create_weather_schema();
342        let batch = self.weather_records_to_batch(records, schema.clone())?;
343
344        let file = File::create(path)?;
345        let props = WriterProperties::builder()
346            .set_compression(self.compression)
347            .set_max_row_group_size(self.row_group_size)
348            .build();
349
350        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
351        writer.write(&batch)?;
352        writer.close()?;
353
354        Ok(())
355    }
356
357    /// Write weather records in batches for memory efficiency
358    pub fn write_weather_records_batched(
359        &self,
360        records: &[WeatherRecord],
361        path: &Path,
362        batch_size: usize,
363    ) -> Result<()> {
364        if records.is_empty() {
365            return Ok(());
366        }
367
368        let schema = self.create_weather_schema();
369        let file = File::create(path)?;
370        let props = WriterProperties::builder()
371            .set_compression(self.compression)
372            .set_max_row_group_size(self.row_group_size)
373            .build();
374
375        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
376
377        // Write in batches
378        for chunk in records.chunks(batch_size) {
379            let batch = self.weather_records_to_batch(chunk, schema.clone())?;
380            writer.write(&batch)?;
381        }
382
383        writer.close()?;
384        Ok(())
385    }
386
387    /// Create Arrow schema for multi-metric weather data
388    fn create_weather_schema(&self) -> Arc<Schema> {
389        let fields = vec![
390            Field::new("station_id", DataType::UInt32, false),
391            Field::new("station_name", DataType::Utf8, false),
392            Field::new("date", DataType::Date32, false),
393            Field::new("latitude", DataType::Float64, false),
394            Field::new("longitude", DataType::Float64, false),
395            // Optional temperature fields
396            Field::new("temp_min", DataType::Float32, true),
397            Field::new("temp_max", DataType::Float32, true),
398            Field::new("temp_avg", DataType::Float32, true),
399            // Optional precipitation field
400            Field::new("precipitation", DataType::Float32, true),
401            // Optional wind speed field
402            Field::new("wind_speed", DataType::Float32, true),
403            // Quality flag fields (original ECAD)
404            Field::new("temp_quality", DataType::Utf8, true),
405            Field::new("precip_quality", DataType::Utf8, true),
406            Field::new("wind_quality", DataType::Utf8, true),
407            // Physical validation fields
408            Field::new("temp_validation", DataType::Utf8, true),
409            Field::new("precip_validation", DataType::Utf8, true),
410            Field::new("wind_validation", DataType::Utf8, true),
411        ];
412
413        Arc::new(Schema::new(fields))
414    }
415
416    /// Convert weather records to Arrow RecordBatch
417    fn weather_records_to_batch(
418        &self,
419        records: &[WeatherRecord],
420        schema: Arc<Schema>,
421    ) -> Result<RecordBatch> {
422        // Extract data into separate vectors
423        let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
424        let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
425        let dates: Vec<i32> = records.iter().map(|r| r.date.num_days_from_ce()).collect();
426        let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
427        let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
428
429        // Temperature data (optional)
430        let temp_mins: Vec<Option<f32>> = records.iter().map(|r| r.temp_min).collect();
431        let temp_maxs: Vec<Option<f32>> = records.iter().map(|r| r.temp_max).collect();
432        let temp_avgs: Vec<Option<f32>> = records.iter().map(|r| r.temp_avg).collect();
433
434        // Other weather metrics (optional)
435        let precipitations: Vec<Option<f32>> = records.iter().map(|r| r.precipitation).collect();
436        let wind_speeds: Vec<Option<f32>> = records.iter().map(|r| r.wind_speed).collect();
437
438        // Quality flags (optional)
439        let temp_qualities: Vec<Option<String>> =
440            records.iter().map(|r| r.temp_quality.clone()).collect();
441        let precip_qualities: Vec<Option<String>> =
442            records.iter().map(|r| r.precip_quality.clone()).collect();
443        let wind_qualities: Vec<Option<String>> =
444            records.iter().map(|r| r.wind_quality.clone()).collect();
445
446        // Physical validation flags (optional)
447        let temp_validations: Vec<Option<String>> = records
448            .iter()
449            .map(|r| r.temp_validation.map(|v| format!("{:?}", v)))
450            .collect();
451        let precip_validations: Vec<Option<String>> = records
452            .iter()
453            .map(|r| r.precip_validation.map(|v| format!("{:?}", v)))
454            .collect();
455        let wind_validations: Vec<Option<String>> = records
456            .iter()
457            .map(|r| r.wind_validation.map(|v| format!("{:?}", v)))
458            .collect();
459
460        // Create Arrow arrays
461        let station_id_array = Arc::new(UInt32Array::from(station_ids));
462        let station_name_array = Arc::new(StringArray::from(station_names));
463        let date_array = Arc::new(Date32Array::from(dates));
464        let latitude_array = Arc::new(Float64Array::from(latitudes));
465        let longitude_array = Arc::new(Float64Array::from(longitudes));
466
467        // Create optional arrays
468        let temp_min_array = Arc::new(Float32Array::from(temp_mins));
469        let temp_max_array = Arc::new(Float32Array::from(temp_maxs));
470        let temp_avg_array = Arc::new(Float32Array::from(temp_avgs));
471        let precipitation_array = Arc::new(Float32Array::from(precipitations));
472        let wind_speed_array = Arc::new(Float32Array::from(wind_speeds));
473
474        let temp_quality_array = Arc::new(StringArray::from(temp_qualities));
475        let precip_quality_array = Arc::new(StringArray::from(precip_qualities));
476        let wind_quality_array = Arc::new(StringArray::from(wind_qualities));
477
478        let temp_validation_array = Arc::new(StringArray::from(temp_validations));
479        let precip_validation_array = Arc::new(StringArray::from(precip_validations));
480        let wind_validation_array = Arc::new(StringArray::from(wind_validations));
481
482        // Create record batch
483        let batch = RecordBatch::try_new(
484            schema,
485            vec![
486                station_id_array,
487                station_name_array,
488                date_array,
489                latitude_array,
490                longitude_array,
491                temp_min_array,
492                temp_max_array,
493                temp_avg_array,
494                precipitation_array,
495                wind_speed_array,
496                temp_quality_array,
497                precip_quality_array,
498                wind_quality_array,
499                temp_validation_array,
500                precip_validation_array,
501                wind_validation_array,
502            ],
503        )?;
504
505        Ok(batch)
506    }
507
508    /// Read sample weather records from Parquet file
509    pub fn read_sample_weather_records(
510        &self,
511        path: &Path,
512        limit: usize,
513    ) -> Result<Vec<WeatherRecord>> {
514        use arrow::array::*;
515        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
516
517        let file = File::open(path)?;
518        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
519            .with_batch_size(limit.min(8192))
520            .build()?;
521
522        let mut records = Vec::new();
523        let mut total_read = 0;
524
525        for batch_result in parquet_reader {
526            let batch = batch_result?;
527
528            // Extract arrays from the batch - handle both old and new schema
529            let num_columns = batch.num_columns();
530
531            if num_columns < 13 {
532                // Old schema format - return empty for now
533                return Ok(Vec::new());
534            }
535
536            let has_validation_fields = num_columns >= 16;
537
538            // New WeatherRecord schema
539            let station_ids = batch
540                .column(0)
541                .as_any()
542                .downcast_ref::<UInt32Array>()
543                .ok_or_else(|| {
544                    crate::error::ProcessingError::Config(
545                        "Invalid station_id column type".to_string(),
546                    )
547                })?;
548            let station_names = batch
549                .column(1)
550                .as_any()
551                .downcast_ref::<StringArray>()
552                .ok_or_else(|| {
553                    crate::error::ProcessingError::Config(
554                        "Invalid station_name column type".to_string(),
555                    )
556                })?;
557            let dates = batch
558                .column(2)
559                .as_any()
560                .downcast_ref::<Date32Array>()
561                .ok_or_else(|| {
562                    crate::error::ProcessingError::Config("Invalid date column type".to_string())
563                })?;
564            let latitudes = batch
565                .column(3)
566                .as_any()
567                .downcast_ref::<Float64Array>()
568                .ok_or_else(|| {
569                    crate::error::ProcessingError::Config(
570                        "Invalid latitude column type".to_string(),
571                    )
572                })?;
573            let longitudes = batch
574                .column(4)
575                .as_any()
576                .downcast_ref::<Float64Array>()
577                .ok_or_else(|| {
578                    crate::error::ProcessingError::Config(
579                        "Invalid longitude column type".to_string(),
580                    )
581                })?;
582
583            // Optional temperature fields
584            let temp_mins = batch
585                .column(5)
586                .as_any()
587                .downcast_ref::<Float32Array>()
588                .ok_or_else(|| {
589                    crate::error::ProcessingError::Config(
590                        "Invalid temp_min column type".to_string(),
591                    )
592                })?;
593            let temp_maxs = batch
594                .column(6)
595                .as_any()
596                .downcast_ref::<Float32Array>()
597                .ok_or_else(|| {
598                    crate::error::ProcessingError::Config(
599                        "Invalid temp_max column type".to_string(),
600                    )
601                })?;
602            let temp_avgs = batch
603                .column(7)
604                .as_any()
605                .downcast_ref::<Float32Array>()
606                .ok_or_else(|| {
607                    crate::error::ProcessingError::Config(
608                        "Invalid temp_avg column type".to_string(),
609                    )
610                })?;
611
612            // Optional other weather metrics
613            let precipitations = batch
614                .column(8)
615                .as_any()
616                .downcast_ref::<Float32Array>()
617                .ok_or_else(|| {
618                    crate::error::ProcessingError::Config(
619                        "Invalid precipitation column type".to_string(),
620                    )
621                })?;
622            let wind_speeds = batch
623                .column(9)
624                .as_any()
625                .downcast_ref::<Float32Array>()
626                .ok_or_else(|| {
627                    crate::error::ProcessingError::Config(
628                        "Invalid wind_speed column type".to_string(),
629                    )
630                })?;
631
632            // Optional quality flags
633            let temp_qualities = batch
634                .column(10)
635                .as_any()
636                .downcast_ref::<StringArray>()
637                .ok_or_else(|| {
638                    crate::error::ProcessingError::Config(
639                        "Invalid temp_quality column type".to_string(),
640                    )
641                })?;
642            let precip_qualities = batch
643                .column(11)
644                .as_any()
645                .downcast_ref::<StringArray>()
646                .ok_or_else(|| {
647                    crate::error::ProcessingError::Config(
648                        "Invalid precip_quality column type".to_string(),
649                    )
650                })?;
651            let wind_qualities = batch
652                .column(12)
653                .as_any()
654                .downcast_ref::<StringArray>()
655                .ok_or_else(|| {
656                    crate::error::ProcessingError::Config(
657                        "Invalid wind_quality column type".to_string(),
658                    )
659                })?;
660
661            // Optional validation fields (new schema)
662            let temp_validations = if has_validation_fields {
663                Some(
664                    batch
665                        .column(13)
666                        .as_any()
667                        .downcast_ref::<StringArray>()
668                        .ok_or_else(|| {
669                            crate::error::ProcessingError::Config(
670                                "Invalid temp_validation column type".to_string(),
671                            )
672                        })?,
673                )
674            } else {
675                None
676            };
677            let precip_validations = if has_validation_fields {
678                Some(
679                    batch
680                        .column(14)
681                        .as_any()
682                        .downcast_ref::<StringArray>()
683                        .ok_or_else(|| {
684                            crate::error::ProcessingError::Config(
685                                "Invalid precip_validation column type".to_string(),
686                            )
687                        })?,
688                )
689            } else {
690                None
691            };
692            let wind_validations = if has_validation_fields {
693                Some(
694                    batch
695                        .column(15)
696                        .as_any()
697                        .downcast_ref::<StringArray>()
698                        .ok_or_else(|| {
699                            crate::error::ProcessingError::Config(
700                                "Invalid wind_validation column type".to_string(),
701                            )
702                        })?,
703                )
704            } else {
705                None
706            };
707
708            // Convert to WeatherRecord objects
709            let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
710
711            for i in 0..batch_records_to_read {
712                let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i))
713                    .ok_or_else(|| {
714                        crate::error::ProcessingError::Config(
715                            "Invalid date in Parquet file".to_string(),
716                        )
717                    })?;
718
719                use crate::models::weather::PhysicalValidity;
720
721                let record = WeatherRecord::new_raw(
722                    station_ids.value(i),
723                    station_names.value(i).to_string(),
724                    date,
725                    latitudes.value(i),
726                    longitudes.value(i),
727                    if temp_mins.is_null(i) {
728                        None
729                    } else {
730                        Some(temp_mins.value(i))
731                    },
732                    if temp_maxs.is_null(i) {
733                        None
734                    } else {
735                        Some(temp_maxs.value(i))
736                    },
737                    if temp_avgs.is_null(i) {
738                        None
739                    } else {
740                        Some(temp_avgs.value(i))
741                    },
742                    if precipitations.is_null(i) {
743                        None
744                    } else {
745                        Some(precipitations.value(i))
746                    },
747                    if wind_speeds.is_null(i) {
748                        None
749                    } else {
750                        Some(wind_speeds.value(i))
751                    },
752                    if temp_qualities.is_null(i) {
753                        None
754                    } else {
755                        Some(temp_qualities.value(i).to_string())
756                    },
757                    if precip_qualities.is_null(i) {
758                        None
759                    } else {
760                        Some(precip_qualities.value(i).to_string())
761                    },
762                    if wind_qualities.is_null(i) {
763                        None
764                    } else {
765                        Some(wind_qualities.value(i).to_string())
766                    },
767                    // Parse validation fields if available
768                    temp_validations.and_then(|arr| {
769                        if arr.is_null(i) {
770                            None
771                        } else {
772                            PhysicalValidity::parse(arr.value(i))
773                        }
774                    }),
775                    precip_validations.and_then(|arr| {
776                        if arr.is_null(i) {
777                            None
778                        } else {
779                            PhysicalValidity::parse(arr.value(i))
780                        }
781                    }),
782                    wind_validations.and_then(|arr| {
783                        if arr.is_null(i) {
784                            None
785                        } else {
786                            PhysicalValidity::parse(arr.value(i))
787                        }
788                    }),
789                );
790
791                records.push(record);
792                total_read += 1;
793
794                if total_read >= limit {
795                    break;
796                }
797            }
798
799            if total_read >= limit {
800                break;
801            }
802        }
803
804        Ok(records)
805    }
806
807    /// Detect the schema type of a Parquet file
808    pub fn detect_schema_type(&self, path: &Path) -> Result<SchemaType> {
809        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
810
811        let file = File::open(path)?;
812        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
813        let schema = builder.schema();
814
815        // Check number of columns to determine schema type
816        let num_columns = schema.fields().len();
817
818        if num_columns == 9 {
819            // Old ConsolidatedRecord schema: station_id, station_name, date, lat, lon, min_temp, max_temp, avg_temp, quality_flags
820            Ok(SchemaType::ConsolidatedRecord)
821        } else if num_columns == 13 || num_columns == 16 {
822            // WeatherRecord schema: 13 cols = original, 16 cols = with validation fields
823            Ok(SchemaType::WeatherRecord)
824        } else {
825            Ok(SchemaType::Unknown)
826        }
827    }
828
829    /// Analyze a WeatherRecord Parquet file comprehensively
830    pub fn analyze_weather_dataset(
831        &self,
832        path: &Path,
833        sample_size: usize,
834    ) -> Result<WeatherDatasetSummary> {
835        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
836        use std::collections::HashSet;
837
838        let file = File::open(path)?;
839        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
840
841        let mut total_records = 0;
842        let mut stations: HashSet<u32> = HashSet::new();
843        let mut all_records = Vec::new();
844
845        // Bounds tracking
846        let mut min_lat = f64::MAX;
847        let mut max_lat = f64::MIN;
848        let mut min_lon = f64::MAX;
849        let mut max_lon = f64::MIN;
850        let mut min_date = None;
851        let mut max_date = None;
852
853        // Metric statistics
854        let mut temp_records = 0;
855        let mut temp_stations: HashSet<u32> = HashSet::new();
856        let mut temp_dates = Vec::new();
857        let mut precip_records = 0;
858        let mut precip_stations: HashSet<u32> = HashSet::new();
859        let mut precip_dates = Vec::new();
860        let mut wind_records = 0;
861        let mut wind_stations: HashSet<u32> = HashSet::new();
862        let mut wind_dates = Vec::new();
863
864        // Extreme tracking
865        let mut coldest_record: Option<WeatherRecord> = None;
866        let mut hottest_record: Option<WeatherRecord> = None;
867        let mut wettest_record: Option<WeatherRecord> = None;
868        let mut windiest_record: Option<WeatherRecord> = None;
869        let mut min_temp_val = f32::MAX;
870        let mut max_temp_val = f32::MIN;
871        let mut max_precip_val = f32::MIN;
872        let mut max_wind_val = f32::MIN;
873
874        // Enhanced data quality tracking
875        let mut ecad_valid = 0;
876        let mut ecad_suspect = 0;
877        let mut ecad_missing = 0;
878        let mut physically_valid = 0;
879        let mut physically_suspect = 0;
880        let mut physically_invalid = 0;
881        let mut combined_valid = 0;
882        let mut combined_suspect_original = 0;
883        let mut combined_suspect_range = 0;
884        let mut combined_suspect_both = 0;
885        let mut combined_invalid = 0;
886        let mut combined_missing = 0;
887
888        for batch_result in parquet_reader {
889            let batch = batch_result?;
890            let num_rows = batch.num_rows();
891
892            if batch.num_columns() < 13 {
893                continue; // Skip if not WeatherRecord format
894            }
895
896            let has_validation_fields = batch.num_columns() >= 16;
897
898            // Extract arrays
899            let station_ids = batch
900                .column(0)
901                .as_any()
902                .downcast_ref::<UInt32Array>()
903                .unwrap();
904            let station_names = batch
905                .column(1)
906                .as_any()
907                .downcast_ref::<StringArray>()
908                .unwrap();
909            let dates = batch
910                .column(2)
911                .as_any()
912                .downcast_ref::<Date32Array>()
913                .unwrap();
914            let latitudes = batch
915                .column(3)
916                .as_any()
917                .downcast_ref::<Float64Array>()
918                .unwrap();
919            let longitudes = batch
920                .column(4)
921                .as_any()
922                .downcast_ref::<Float64Array>()
923                .unwrap();
924            let temp_mins = batch
925                .column(5)
926                .as_any()
927                .downcast_ref::<Float32Array>()
928                .unwrap();
929            let temp_maxs = batch
930                .column(6)
931                .as_any()
932                .downcast_ref::<Float32Array>()
933                .unwrap();
934            let temp_avgs = batch
935                .column(7)
936                .as_any()
937                .downcast_ref::<Float32Array>()
938                .unwrap();
939            let precipitations = batch
940                .column(8)
941                .as_any()
942                .downcast_ref::<Float32Array>()
943                .unwrap();
944            let wind_speeds = batch
945                .column(9)
946                .as_any()
947                .downcast_ref::<Float32Array>()
948                .unwrap();
949            let temp_qualities = batch
950                .column(10)
951                .as_any()
952                .downcast_ref::<StringArray>()
953                .unwrap();
954            let precip_qualities = batch
955                .column(11)
956                .as_any()
957                .downcast_ref::<StringArray>()
958                .unwrap();
959            let wind_qualities = batch
960                .column(12)
961                .as_any()
962                .downcast_ref::<StringArray>()
963                .unwrap();
964
965            // Optional validation fields
966            let temp_validations = if has_validation_fields {
967                Some(
968                    batch
969                        .column(13)
970                        .as_any()
971                        .downcast_ref::<StringArray>()
972                        .unwrap(),
973                )
974            } else {
975                None
976            };
977            let precip_validations = if has_validation_fields {
978                Some(
979                    batch
980                        .column(14)
981                        .as_any()
982                        .downcast_ref::<StringArray>()
983                        .unwrap(),
984                )
985            } else {
986                None
987            };
988            let wind_validations = if has_validation_fields {
989                Some(
990                    batch
991                        .column(15)
992                        .as_any()
993                        .downcast_ref::<StringArray>()
994                        .unwrap(),
995                )
996            } else {
997                None
998            };
999
1000            for i in 0..num_rows {
1001                total_records += 1;
1002                let station_id = station_ids.value(i);
1003                stations.insert(station_id);
1004
1005                let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i)).unwrap();
1006
1007                // Update date bounds
1008                min_date = Some(min_date.map_or(date, |d: chrono::NaiveDate| d.min(date)));
1009                max_date = Some(max_date.map_or(date, |d: chrono::NaiveDate| d.max(date)));
1010
1011                // Update geographic bounds
1012                let lat = latitudes.value(i);
1013                let lon = longitudes.value(i);
1014                min_lat = min_lat.min(lat);
1015                max_lat = max_lat.max(lat);
1016                min_lon = min_lon.min(lon);
1017                max_lon = max_lon.max(lon);
1018
1019                // Create record for sampling and analysis
1020                use crate::models::weather::{DataQuality, PhysicalValidity};
1021
1022                let record = WeatherRecord::new_raw(
1023                    station_id,
1024                    station_names.value(i).to_string(),
1025                    date,
1026                    lat,
1027                    lon,
1028                    if temp_mins.is_null(i) {
1029                        None
1030                    } else {
1031                        Some(temp_mins.value(i))
1032                    },
1033                    if temp_maxs.is_null(i) {
1034                        None
1035                    } else {
1036                        Some(temp_maxs.value(i))
1037                    },
1038                    if temp_avgs.is_null(i) {
1039                        None
1040                    } else {
1041                        Some(temp_avgs.value(i))
1042                    },
1043                    if precipitations.is_null(i) {
1044                        None
1045                    } else {
1046                        Some(precipitations.value(i))
1047                    },
1048                    if wind_speeds.is_null(i) {
1049                        None
1050                    } else {
1051                        Some(wind_speeds.value(i))
1052                    },
1053                    if temp_qualities.is_null(i) {
1054                        None
1055                    } else {
1056                        Some(temp_qualities.value(i).to_string())
1057                    },
1058                    if precip_qualities.is_null(i) {
1059                        None
1060                    } else {
1061                        Some(precip_qualities.value(i).to_string())
1062                    },
1063                    if wind_qualities.is_null(i) {
1064                        None
1065                    } else {
1066                        Some(wind_qualities.value(i).to_string())
1067                    },
1068                    // Parse validation fields if available
1069                    temp_validations.and_then(|arr| {
1070                        if arr.is_null(i) {
1071                            None
1072                        } else {
1073                            PhysicalValidity::parse(arr.value(i))
1074                        }
1075                    }),
1076                    precip_validations.and_then(|arr| {
1077                        if arr.is_null(i) {
1078                            None
1079                        } else {
1080                            PhysicalValidity::parse(arr.value(i))
1081                        }
1082                    }),
1083                    wind_validations.and_then(|arr| {
1084                        if arr.is_null(i) {
1085                            None
1086                        } else {
1087                            PhysicalValidity::parse(arr.value(i))
1088                        }
1089                    }),
1090                );
1091
1092                // Track metrics
1093                if record.has_temperature_data() {
1094                    temp_records += 1;
1095                    temp_stations.insert(station_id);
1096                    temp_dates.push(date);
1097
1098                    // Track extreme temperatures (exclude invalid values)
1099                    let temp_quality = record.assess_temperature_quality();
1100                    if !matches!(temp_quality, DataQuality::Invalid) {
1101                        if let Some(min_temp) = record.temp_min {
1102                            if min_temp < min_temp_val {
1103                                min_temp_val = min_temp;
1104                                coldest_record = Some(record.clone());
1105                            }
1106                        }
1107
1108                        if let Some(max_temp) = record.temp_max {
1109                            if max_temp > max_temp_val {
1110                                max_temp_val = max_temp;
1111                                hottest_record = Some(record.clone());
1112                            }
1113                        }
1114                    }
1115                }
1116
1117                if record.has_precipitation() {
1118                    precip_records += 1;
1119                    precip_stations.insert(station_id);
1120                    precip_dates.push(date);
1121
1122                    // Track extreme precipitation (exclude invalid values)
1123                    let precip_quality = record.assess_precipitation_quality();
1124                    if !matches!(precip_quality, DataQuality::Invalid) {
1125                        if let Some(precip) = record.precipitation {
1126                            if precip > max_precip_val {
1127                                max_precip_val = precip;
1128                                wettest_record = Some(record.clone());
1129                            }
1130                        }
1131                    }
1132                }
1133
1134                if record.has_wind_speed() {
1135                    wind_records += 1;
1136                    wind_stations.insert(station_id);
1137                    wind_dates.push(date);
1138
1139                    // Track extreme wind speed (exclude invalid values)
1140                    let wind_quality = record.assess_wind_quality();
1141                    if !matches!(wind_quality, DataQuality::Invalid) {
1142                        if let Some(wind) = record.wind_speed {
1143                            if wind > max_wind_val {
1144                                max_wind_val = wind;
1145                                windiest_record = Some(record.clone());
1146                            }
1147                        }
1148                    }
1149                }
1150
1151                // Enhanced data quality analysis
1152
1153                // Track ECAD quality flags for each metric present
1154
1155                // Temperature ECAD flags
1156                if record.has_temperature_data() {
1157                    if let Some(temp_quality) = &record.temp_quality {
1158                        if temp_quality.contains('0') {
1159                            ecad_valid += 1;
1160                        } else if temp_quality.contains('1') {
1161                            ecad_suspect += 1;
1162                        } else if temp_quality.contains('9') {
1163                            ecad_missing += 1;
1164                        }
1165                    }
1166                }
1167
1168                // Precipitation ECAD flags
1169                if record.has_precipitation() {
1170                    if let Some(precip_quality) = &record.precip_quality {
1171                        if precip_quality == "0" {
1172                            ecad_valid += 1;
1173                        } else if precip_quality == "1" {
1174                            ecad_suspect += 1;
1175                        } else if precip_quality == "9" {
1176                            ecad_missing += 1;
1177                        }
1178                    }
1179                }
1180
1181                // Wind speed ECAD flags
1182                if record.has_wind_speed() {
1183                    if let Some(wind_quality) = &record.wind_quality {
1184                        if wind_quality == "0" {
1185                            ecad_valid += 1;
1186                        } else if wind_quality == "1" {
1187                            ecad_suspect += 1;
1188                        } else if wind_quality == "9" {
1189                            ecad_missing += 1;
1190                        }
1191                    }
1192                }
1193
1194                // Track physical validation for each metric present
1195                if let Some(validation) = record.temp_validation {
1196                    match validation {
1197                        PhysicalValidity::Valid => physically_valid += 1,
1198                        PhysicalValidity::Suspect => physically_suspect += 1,
1199                        PhysicalValidity::Invalid => physically_invalid += 1,
1200                    }
1201                }
1202                if let Some(validation) = record.precip_validation {
1203                    match validation {
1204                        PhysicalValidity::Valid => physically_valid += 1,
1205                        PhysicalValidity::Suspect => physically_suspect += 1,
1206                        PhysicalValidity::Invalid => physically_invalid += 1,
1207                    }
1208                }
1209                if let Some(validation) = record.wind_validation {
1210                    match validation {
1211                        PhysicalValidity::Valid => physically_valid += 1,
1212                        PhysicalValidity::Suspect => physically_suspect += 1,
1213                        PhysicalValidity::Invalid => physically_invalid += 1,
1214                    }
1215                }
1216
1217                // Track combined quality assessment
1218                let temp_quality = record.assess_temperature_quality();
1219                let precip_quality = record.assess_precipitation_quality();
1220                let wind_quality = record.assess_wind_quality();
1221
1222                for quality in [temp_quality, precip_quality, wind_quality] {
1223                    match quality {
1224                        DataQuality::Valid => combined_valid += 1,
1225                        DataQuality::SuspectOriginal => combined_suspect_original += 1,
1226                        DataQuality::SuspectRange => combined_suspect_range += 1,
1227                        DataQuality::SuspectBoth => combined_suspect_both += 1,
1228                        DataQuality::Invalid => combined_invalid += 1,
1229                        DataQuality::Missing => combined_missing += 1,
1230                    }
1231                }
1232
1233                // Store for sampling
1234                all_records.push(record);
1235            }
1236        }
1237
1238        // Create diverse sampling
1239        let sample_records = self.create_diverse_sample(&all_records, sample_size);
1240
1241        // Calculate temporal ranges per metric
1242        temp_dates.sort();
1243        precip_dates.sort();
1244        wind_dates.sort();
1245
1246        let temperature_range = if !temp_dates.is_empty() {
1247            Some((temp_dates[0], temp_dates[temp_dates.len() - 1]))
1248        } else {
1249            None
1250        };
1251
1252        let precipitation_range = if !precip_dates.is_empty() {
1253            Some((precip_dates[0], precip_dates[precip_dates.len() - 1]))
1254        } else {
1255            None
1256        };
1257
1258        let wind_range = if !wind_dates.is_empty() {
1259            Some((wind_dates[0], wind_dates[wind_dates.len() - 1]))
1260        } else {
1261            None
1262        };
1263
1264        // Countries (simplified - could be enhanced with actual geographic lookup)
1265        let countries = if min_lon < -5.0 && max_lat > 53.0 {
1266            vec!["GB".to_string(), "IE".to_string()]
1267        } else if max_lat > 55.0 {
1268            vec!["GB".to_string()]
1269        } else {
1270            vec!["IE".to_string()]
1271        };
1272
1273        Ok(WeatherDatasetSummary {
1274            total_records,
1275            total_stations: stations.len(),
1276            geographic_bounds: GeographicBounds {
1277                min_lat,
1278                max_lat,
1279                min_lon,
1280                max_lon,
1281                countries,
1282            },
1283            temporal_coverage: TemporalCoverage {
1284                overall_start: min_date
1285                    .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap()),
1286                overall_end: max_date
1287                    .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(2000, 1, 1).unwrap()),
1288                temperature_range,
1289                precipitation_range,
1290                wind_range,
1291            },
1292            metric_statistics: MetricStatistics {
1293                temperature_records: temp_records,
1294                temperature_stations: temp_stations.len(),
1295                precipitation_records: precip_records,
1296                precipitation_stations: precip_stations.len(),
1297                wind_records,
1298                wind_stations: wind_stations.len(),
1299                temperature_range: if min_temp_val != f32::MAX && max_temp_val != f32::MIN {
1300                    Some((min_temp_val, max_temp_val))
1301                } else {
1302                    None
1303                },
1304                precipitation_range: if max_precip_val != f32::MIN {
1305                    Some((0.0, max_precip_val))
1306                } else {
1307                    None
1308                },
1309                wind_range: if max_wind_val != f32::MIN {
1310                    Some((0.0, max_wind_val))
1311                } else {
1312                    None
1313                },
1314            },
1315            data_quality: EnhancedDataQuality {
1316                ecad_valid,
1317                ecad_suspect,
1318                ecad_missing,
1319                physically_valid,
1320                physically_suspect,
1321                physically_invalid,
1322                combined_valid,
1323                combined_suspect_original,
1324                combined_suspect_range,
1325                combined_suspect_both,
1326                combined_invalid,
1327                combined_missing,
1328                validation_errors: 0, // Will be populated from integrity report if available
1329            },
1330            sample_records,
1331            extreme_records: ExtremeRecords {
1332                coldest: coldest_record,
1333                hottest: hottest_record,
1334                wettest: wettest_record,
1335                windiest: windiest_record,
1336            },
1337        })
1338    }
1339
1340    /// Create diverse sample of records for display
1341    fn create_diverse_sample(
1342        &self,
1343        all_records: &[WeatherRecord],
1344        sample_size: usize,
1345    ) -> Vec<WeatherRecord> {
1346        use std::collections::HashMap;
1347
1348        if all_records.is_empty() || sample_size == 0 {
1349            return Vec::new();
1350        }
1351
1352        let mut samples = Vec::new();
1353        let mut station_counts: HashMap<String, usize> = HashMap::new();
1354
1355        // Strategy: Sample diverse stations and metric combinations
1356        let total_records = all_records.len();
1357        let step = if total_records > sample_size * 100 {
1358            total_records / (sample_size * 50) // Sample more spread out for large datasets
1359        } else {
1360            std::cmp::max(1, total_records / sample_size)
1361        };
1362
1363        for (i, record) in all_records.iter().enumerate() {
1364            if i % step == 0 && samples.len() < sample_size {
1365                // Limit samples per station for diversity
1366                let station_count = station_counts
1367                    .entry(record.station_name.clone())
1368                    .or_insert(0);
1369                if *station_count < 2 {
1370                    // Max 2 samples per station
1371                    samples.push(record.clone());
1372                    *station_count += 1;
1373                }
1374            }
1375        }
1376
1377        // If we still need more samples, fill with any remaining records
1378        if samples.len() < sample_size {
1379            for record in all_records.iter().step_by(step * 2) {
1380                if samples.len() >= sample_size {
1381                    break;
1382                }
1383                if !samples
1384                    .iter()
1385                    .any(|s| s.station_name == record.station_name && s.date == record.date)
1386                {
1387                    samples.push(record.clone());
1388                }
1389            }
1390        }
1391
1392        samples.truncate(sample_size);
1393        samples
1394    }
1395}
1396
1397#[derive(Debug, PartialEq)]
1398pub enum SchemaType {
1399    ConsolidatedRecord,
1400    WeatherRecord,
1401    Unknown,
1402}
1403
1404#[derive(Debug, Clone)]
1405pub struct WeatherDatasetSummary {
1406    pub total_records: usize,
1407    pub total_stations: usize,
1408    pub geographic_bounds: GeographicBounds,
1409    pub temporal_coverage: TemporalCoverage,
1410    pub metric_statistics: MetricStatistics,
1411    pub data_quality: EnhancedDataQuality,
1412    pub sample_records: Vec<WeatherRecord>,
1413    pub extreme_records: ExtremeRecords,
1414}
1415
1416#[derive(Debug, Clone)]
1417pub struct GeographicBounds {
1418    pub min_lat: f64,
1419    pub max_lat: f64,
1420    pub min_lon: f64,
1421    pub max_lon: f64,
1422    pub countries: Vec<String>,
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct TemporalCoverage {
1427    pub overall_start: chrono::NaiveDate,
1428    pub overall_end: chrono::NaiveDate,
1429    pub temperature_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1430    pub precipitation_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1431    pub wind_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1432}
1433
1434#[derive(Debug, Clone)]
1435pub struct MetricStatistics {
1436    pub temperature_records: usize,
1437    pub temperature_stations: usize,
1438    pub precipitation_records: usize,
1439    pub precipitation_stations: usize,
1440    pub wind_records: usize,
1441    pub wind_stations: usize,
1442    pub temperature_range: Option<(f32, f32)>,
1443    pub precipitation_range: Option<(f32, f32)>,
1444    pub wind_range: Option<(f32, f32)>,
1445}
1446
1447#[derive(Debug, Clone)]
1448pub struct EnhancedDataQuality {
1449    // ECAD quality flag assessment
1450    pub ecad_valid: usize,
1451    pub ecad_suspect: usize,
1452    pub ecad_missing: usize,
1453
1454    // Physical validation assessment
1455    pub physically_valid: usize,
1456    pub physically_suspect: usize,
1457    pub physically_invalid: usize,
1458
1459    // Combined quality assessment
1460    pub combined_valid: usize,
1461    pub combined_suspect_original: usize,
1462    pub combined_suspect_range: usize,
1463    pub combined_suspect_both: usize,
1464    pub combined_invalid: usize,
1465    pub combined_missing: usize,
1466
1467    pub validation_errors: usize,
1468}
1469
1470#[derive(Debug, Clone)]
1471pub struct ExtremeRecords {
1472    pub coldest: Option<WeatherRecord>,
1473    pub hottest: Option<WeatherRecord>,
1474    pub wettest: Option<WeatherRecord>,
1475    pub windiest: Option<WeatherRecord>,
1476}
1477
1478impl WeatherDatasetSummary {
1479    pub fn display_comprehensive_summary(&self) -> String {
1480        let mut summary = String::new();
1481
1482        // Header
1483        summary.push_str("UNIFIED WEATHER DATASET ANALYSIS\n");
1484        summary.push_str("================================\n\n");
1485
1486        // Dataset Overview
1487        summary.push_str(&format!(
1488            "Dataset Overview:\n\
1489            - Records: {} unified weather records\n\
1490            - Stations: {} across {} ({:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E)\n\
1491            - Timespan: {} to {} ({} years)\n\n",
1492            self.total_records,
1493            self.total_stations,
1494            self.geographic_bounds.countries.join("/"),
1495            self.geographic_bounds.min_lat,
1496            self.geographic_bounds.max_lat,
1497            if self.geographic_bounds.min_lon < 0.0 {
1498                -self.geographic_bounds.min_lon
1499            } else {
1500                self.geographic_bounds.min_lon
1501            },
1502            self.geographic_bounds.max_lon,
1503            self.temporal_coverage.overall_start,
1504            self.temporal_coverage.overall_end,
1505            self.temporal_coverage.overall_end.year() - self.temporal_coverage.overall_start.year()
1506        ));
1507
1508        // Metric Coverage Table
1509        summary.push_str("Metric Coverage:\n");
1510        summary.push_str(
1511            "┌─────────────────┬──────────┬─────────────┬──────────────┬─────────────┐\n",
1512        );
1513        summary.push_str(
1514            "│ Metric          │ Stations │ Records     │ Coverage     │ Date Range  │\n",
1515        );
1516        summary.push_str(
1517            "├─────────────────┼──────────┼─────────────┼──────────────┼─────────────┤\n",
1518        );
1519
1520        // Temperature row
1521        let temp_coverage = if self.total_records > 0 {
1522            (self.metric_statistics.temperature_records as f32 / self.total_records as f32) * 100.0
1523        } else {
1524            0.0
1525        };
1526
1527        let temp_range = if let Some((start, end)) = self.temporal_coverage.temperature_range {
1528            format!("{}-{}", start.year(), end.year())
1529        } else {
1530            "N/A".to_string()
1531        };
1532
1533        summary.push_str(&format!(
1534            "│ Temperature     │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1535            self.metric_statistics.temperature_stations,
1536            self.metric_statistics.temperature_records,
1537            temp_coverage,
1538            temp_range
1539        ));
1540
1541        // Precipitation row
1542        let precip_coverage = if self.total_records > 0 {
1543            (self.metric_statistics.precipitation_records as f32 / self.total_records as f32)
1544                * 100.0
1545        } else {
1546            0.0
1547        };
1548
1549        let precip_range = if let Some((start, end)) = self.temporal_coverage.precipitation_range {
1550            format!("{}-{}", start.year(), end.year())
1551        } else {
1552            "N/A".to_string()
1553        };
1554
1555        summary.push_str(&format!(
1556            "│ Precipitation   │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1557            self.metric_statistics.precipitation_stations,
1558            self.metric_statistics.precipitation_records,
1559            precip_coverage,
1560            precip_range
1561        ));
1562
1563        // Wind row
1564        let wind_coverage = if self.total_records > 0 {
1565            (self.metric_statistics.wind_records as f32 / self.total_records as f32) * 100.0
1566        } else {
1567            0.0
1568        };
1569
1570        let wind_range = if let Some((start, end)) = self.temporal_coverage.wind_range {
1571            format!("{}-{}", start.year(), end.year())
1572        } else {
1573            "N/A".to_string()
1574        };
1575
1576        summary.push_str(&format!(
1577            "│ Wind Speed      │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1578            self.metric_statistics.wind_stations,
1579            self.metric_statistics.wind_records,
1580            wind_coverage,
1581            wind_range
1582        ));
1583
1584        summary.push_str(
1585            "└─────────────────┴──────────┴─────────────┴──────────────┴─────────────┘\n\n",
1586        );
1587
1588        // Sample Records (diverse)
1589        if !self.sample_records.is_empty() {
1590            summary.push_str("Sample Records (diverse stations & metrics):\n");
1591            for (i, record) in self.sample_records.iter().enumerate() {
1592                let mut metrics_display = Vec::new();
1593
1594                // Temperature display
1595                let temp_parts: Vec<String> = [
1596                    record.temp_min.map(|t| format!("min={:.1}°C", t)),
1597                    record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
1598                    record.temp_max.map(|t| format!("max={:.1}°C", t)),
1599                ]
1600                .into_iter()
1601                .flatten()
1602                .collect();
1603
1604                if !temp_parts.is_empty() {
1605                    metrics_display.push(format!("temp({})", temp_parts.join(", ")));
1606                }
1607
1608                if let Some(precip) = record.precipitation {
1609                    metrics_display.push(format!("precip={:.1}mm", precip));
1610                }
1611
1612                if let Some(wind) = record.wind_speed {
1613                    metrics_display.push(format!("wind={:.1}m/s", wind));
1614                }
1615
1616                let metrics_str = if metrics_display.is_empty() {
1617                    "no data".to_string()
1618                } else {
1619                    metrics_display.join(", ")
1620                };
1621
1622                summary.push_str(&format!(
1623                    "{}. {} on {}: {}\n",
1624                    i + 1,
1625                    record.station_name,
1626                    record.date,
1627                    metrics_str
1628                ));
1629            }
1630            summary.push('\n');
1631        }
1632
1633        // Extreme Records
1634        summary.push_str("Extreme Records:\n");
1635        if let Some(ref coldest) = self.extreme_records.coldest {
1636            if let Some(min_temp) = coldest.temp_min {
1637                summary.push_str(&format!(
1638                    "- Coldest: {:.1}°C at {} ({})\n",
1639                    min_temp, coldest.station_name, coldest.date
1640                ));
1641            }
1642        }
1643
1644        if let Some(ref hottest) = self.extreme_records.hottest {
1645            if let Some(max_temp) = hottest.temp_max {
1646                summary.push_str(&format!(
1647                    "- Hottest: {:.1}°C at {} ({})\n",
1648                    max_temp, hottest.station_name, hottest.date
1649                ));
1650            }
1651        }
1652
1653        if let Some(ref wettest) = self.extreme_records.wettest {
1654            if let Some(precip) = wettest.precipitation {
1655                summary.push_str(&format!(
1656                    "- Wettest: {:.1}mm at {} ({})\n",
1657                    precip, wettest.station_name, wettest.date
1658                ));
1659            }
1660        }
1661
1662        if let Some(ref windiest) = self.extreme_records.windiest {
1663            if let Some(wind) = windiest.wind_speed {
1664                summary.push_str(&format!(
1665                    "- Windiest: {:.1}m/s at {} ({})\n",
1666                    wind, windiest.station_name, windiest.date
1667                ));
1668            }
1669        }
1670        summary.push('\n');
1671
1672        // Enhanced Data Quality Analysis
1673        summary.push_str("Data Quality Analysis:\n");
1674
1675        let total_ecad = self.data_quality.ecad_valid
1676            + self.data_quality.ecad_suspect
1677            + self.data_quality.ecad_missing;
1678        if total_ecad > 0 {
1679            summary.push_str("├─ ECAD Assessment:\n");
1680            summary.push_str(&format!(
1681                "│  ├─ Valid (flag=0): {} ({:.1}%)\n",
1682                self.data_quality.ecad_valid,
1683                (self.data_quality.ecad_valid as f32 / total_ecad as f32) * 100.0
1684            ));
1685            summary.push_str(&format!(
1686                "│  ├─ Suspect (flag=1): {} ({:.1}%)\n",
1687                self.data_quality.ecad_suspect,
1688                (self.data_quality.ecad_suspect as f32 / total_ecad as f32) * 100.0
1689            ));
1690            summary.push_str(&format!(
1691                "│  └─ Missing (flag=9): {} ({:.1}%)\n",
1692                self.data_quality.ecad_missing,
1693                (self.data_quality.ecad_missing as f32 / total_ecad as f32) * 100.0
1694            ));
1695            summary.push_str("│\n");
1696        }
1697
1698        let total_physical = self.data_quality.physically_valid
1699            + self.data_quality.physically_suspect
1700            + self.data_quality.physically_invalid;
1701        if total_physical > 0 {
1702            summary.push_str("├─ Physical Validation:\n");
1703            summary.push_str(&format!(
1704                "│  ├─ Valid: {} ({:.1}%)\n",
1705                self.data_quality.physically_valid,
1706                (self.data_quality.physically_valid as f32 / total_physical as f32) * 100.0
1707            ));
1708            summary.push_str(&format!(
1709                "│  ├─ Suspect: {} ({:.1}%)\n",
1710                self.data_quality.physically_suspect,
1711                (self.data_quality.physically_suspect as f32 / total_physical as f32) * 100.0
1712            ));
1713            summary.push_str(&format!(
1714                "│  └─ Invalid: {} ({:.3}%)\n",
1715                self.data_quality.physically_invalid,
1716                (self.data_quality.physically_invalid as f32 / total_physical as f32) * 100.0
1717            ));
1718            summary.push_str("│\n");
1719        }
1720
1721        let total_combined = self.data_quality.combined_valid
1722            + self.data_quality.combined_suspect_original
1723            + self.data_quality.combined_suspect_range
1724            + self.data_quality.combined_suspect_both
1725            + self.data_quality.combined_invalid
1726            + self.data_quality.combined_missing;
1727        if total_combined > 0 {
1728            summary.push_str("└─ Combined Quality:\n");
1729            summary.push_str(&format!(
1730                "   ├─ Valid: {} ({:.1}%)\n",
1731                self.data_quality.combined_valid,
1732                (self.data_quality.combined_valid as f32 / total_combined as f32) * 100.0
1733            ));
1734            summary.push_str(&format!(
1735                "   ├─ Suspect (original): {} ({:.1}%)\n",
1736                self.data_quality.combined_suspect_original,
1737                (self.data_quality.combined_suspect_original as f32 / total_combined as f32)
1738                    * 100.0
1739            ));
1740            summary.push_str(&format!(
1741                "   ├─ Suspect (range): {} ({:.2}%)\n",
1742                self.data_quality.combined_suspect_range,
1743                (self.data_quality.combined_suspect_range as f32 / total_combined as f32) * 100.0
1744            ));
1745            if self.data_quality.combined_invalid > 0 {
1746                summary.push_str(&format!(
1747                    "   ├─ Invalid: {} ({:.3}%)\n",
1748                    self.data_quality.combined_invalid,
1749                    (self.data_quality.combined_invalid as f32 / total_combined as f32) * 100.0
1750                ));
1751            }
1752            summary.push_str(&format!(
1753                "   └─ Missing: {} ({:.1}%)\n",
1754                self.data_quality.combined_missing,
1755                (self.data_quality.combined_missing as f32 / total_combined as f32) * 100.0
1756            ));
1757        }
1758
1759        if self.data_quality.physically_invalid > 0 {
1760            summary.push_str(&format!(
1761                "\n⚠️  Found {} physically impossible values that were excluded from extreme records analysis\n",
1762                self.data_quality.physically_invalid
1763            ));
1764        }
1765
1766        summary
1767    }
1768}
1769
1770impl Default for ParquetWriter {
1771    fn default() -> Self {
1772        Self::new()
1773    }
1774}
1775
1776#[derive(Debug)]
1777pub struct ParquetFileInfo {
1778    pub total_rows: i64,
1779    pub row_groups: i32,
1780    pub row_group_sizes: Vec<i64>,
1781    pub file_size: u64,
1782    pub compression: Compression,
1783}
1784
1785impl ParquetFileInfo {
1786    pub fn summary(&self) -> String {
1787        format!(
1788            "Parquet File Summary:\n\
1789            - Total rows: {}\n\
1790            - Row groups: {}\n\
1791            - File size: {:.2} MB\n\
1792            - Compression: {:?}\n\
1793            - Avg rows per group: {:.0}",
1794            self.total_rows,
1795            self.row_groups,
1796            self.file_size as f64 / 1_048_576.0, // Convert to MB
1797            self.compression,
1798            self.total_rows as f64 / self.row_groups as f64
1799        )
1800    }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805    use super::*;
1806    use crate::models::ConsolidatedRecord;
1807    use chrono::NaiveDate;
1808    use tempfile::NamedTempFile;
1809
1810    #[test]
1811    fn test_write_empty_records() {
1812        let writer = ParquetWriter::new();
1813        let temp_file = NamedTempFile::new().unwrap();
1814
1815        let result = writer.write_records(&[], temp_file.path());
1816        assert!(result.is_ok());
1817    }
1818
1819    #[test]
1820    fn test_write_single_record() -> Result<()> {
1821        let writer = ParquetWriter::new();
1822        let temp_file = NamedTempFile::new().unwrap();
1823
1824        let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1825        let record = ConsolidatedRecord::new(
1826            12345,
1827            "Test Station".to_string(),
1828            date,
1829            51.5074,
1830            -0.1278,
1831            15.0,
1832            25.0,
1833            20.0,
1834            "000".to_string(),
1835        );
1836
1837        writer.write_records(&[record], temp_file.path())?;
1838
1839        // Verify file was created and has content
1840        let metadata = std::fs::metadata(temp_file.path())?;
1841        assert!(metadata.len() > 0);
1842
1843        Ok(())
1844    }
1845
1846    #[test]
1847    fn test_different_compressions() -> Result<()> {
1848        let compressions = ["snappy", "gzip", "lz4", "zstd", "none"];
1849
1850        for compression in &compressions {
1851            let writer = ParquetWriter::new().with_compression(compression)?;
1852            let temp_file = NamedTempFile::new().unwrap();
1853
1854            let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1855            let record = ConsolidatedRecord::new(
1856                12345,
1857                "Test Station".to_string(),
1858                date,
1859                51.5074,
1860                -0.1278,
1861                15.0,
1862                25.0,
1863                20.0,
1864                "000".to_string(),
1865            );
1866
1867            let result = writer.write_records(&[record], temp_file.path());
1868            assert!(result.is_ok(), "Failed with compression: {}", compression);
1869        }
1870
1871        Ok(())
1872    }
1873}