ecad_processor/writers/
parquet_writer.rs

1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, WeatherRecord};
3use crate::utils::constants::DEFAULT_ROW_GROUP_SIZE;
4use arrow::array::*;
5use arrow::datatypes::{DataType, Field, Schema};
6use arrow::record_batch::RecordBatch;
7use chrono::Datelike;
8use parquet::arrow::ArrowWriter;
9use parquet::basic::{Compression, GzipLevel};
10use parquet::file::properties::WriterProperties;
11use std::fs::File;
12use std::path::Path;
13use std::sync::Arc;
14
15pub struct ParquetWriter {
16    compression: Compression,
17    row_group_size: usize,
18}
19
20impl ParquetWriter {
21    pub fn new() -> Self {
22        Self {
23            compression: Compression::SNAPPY,
24            row_group_size: DEFAULT_ROW_GROUP_SIZE,
25        }
26    }
27
28    pub fn with_compression(mut self, compression: &str) -> Result<Self> {
29        self.compression = match compression.to_lowercase().as_str() {
30            "snappy" => Compression::SNAPPY,
31            "gzip" => Compression::GZIP(GzipLevel::default()),
32            "lz4" => Compression::LZ4,
33            "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
34            "none" => Compression::UNCOMPRESSED,
35            _ => {
36                return Err(crate::error::ProcessingError::Config(format!(
37                    "Unsupported compression: {}",
38                    compression
39                )))
40            }
41        };
42        Ok(self)
43    }
44
45    pub fn with_row_group_size(mut self, size: usize) -> Self {
46        self.row_group_size = size;
47        self
48    }
49
50    /// Write consolidated records to Parquet file
51    pub fn write_records(&self, records: &[ConsolidatedRecord], path: &Path) -> Result<()> {
52        if records.is_empty() {
53            return Ok(());
54        }
55
56        let schema = self.create_schema();
57        let batch = self.records_to_batch(records, schema.clone())?;
58
59        let file = File::create(path)?;
60        let props = WriterProperties::builder()
61            .set_compression(self.compression)
62            .set_max_row_group_size(self.row_group_size)
63            .build();
64
65        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
66        writer.write(&batch)?;
67        writer.close()?;
68
69        Ok(())
70    }
71
72    /// Write records in batches for memory efficiency
73    pub fn write_records_batched(
74        &self,
75        records: &[ConsolidatedRecord],
76        path: &Path,
77        batch_size: usize,
78    ) -> Result<()> {
79        if records.is_empty() {
80            return Ok(());
81        }
82
83        let schema = self.create_schema();
84        let file = File::create(path)?;
85        let props = WriterProperties::builder()
86            .set_compression(self.compression)
87            .set_max_row_group_size(self.row_group_size)
88            .build();
89
90        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
91
92        // Write in batches
93        for chunk in records.chunks(batch_size) {
94            let batch = self.records_to_batch(chunk, schema.clone())?;
95            writer.write(&batch)?;
96        }
97
98        writer.close()?;
99        Ok(())
100    }
101
102    /// Create Arrow schema for temperature data
103    fn create_schema(&self) -> Arc<Schema> {
104        let fields = vec![
105            Field::new("station_id", DataType::UInt32, false),
106            Field::new("station_name", DataType::Utf8, false),
107            Field::new("date", DataType::Date32, false),
108            Field::new("latitude", DataType::Float64, false),
109            Field::new("longitude", DataType::Float64, false),
110            Field::new("min_temp", DataType::Float32, false),
111            Field::new("max_temp", DataType::Float32, false),
112            Field::new("avg_temp", DataType::Float32, false),
113            Field::new("quality_flags", DataType::Utf8, false),
114        ];
115
116        Arc::new(Schema::new(fields))
117    }
118
119    /// Convert records to Arrow RecordBatch
120    fn records_to_batch(
121        &self,
122        records: &[ConsolidatedRecord],
123        schema: Arc<Schema>,
124    ) -> Result<RecordBatch> {
125        // Extract data into separate vectors
126        let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
127        let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
128        let dates: Vec<i32> = records
129            .iter()
130            .map(|r| {
131                r.date
132                    .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
133                    .num_days() as i32
134            })
135            .collect();
136        let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
137        let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
138        let min_temps: Vec<f32> = records.iter().map(|r| r.min_temp).collect();
139        let max_temps: Vec<f32> = records.iter().map(|r| r.max_temp).collect();
140        let avg_temps: Vec<f32> = records.iter().map(|r| r.avg_temp).collect();
141        let quality_flags: Vec<String> = records.iter().map(|r| r.quality_flags.clone()).collect();
142
143        // Create Arrow arrays
144        let station_id_array = Arc::new(UInt32Array::from(station_ids));
145        let station_name_array = Arc::new(StringArray::from(station_names));
146        let date_array = Arc::new(Date32Array::from(dates));
147        let latitude_array = Arc::new(Float64Array::from(latitudes));
148        let longitude_array = Arc::new(Float64Array::from(longitudes));
149        let min_temp_array = Arc::new(Float32Array::from(min_temps));
150        let max_temp_array = Arc::new(Float32Array::from(max_temps));
151        let avg_temp_array = Arc::new(Float32Array::from(avg_temps));
152        let quality_flags_array = Arc::new(StringArray::from(quality_flags));
153
154        // Create record batch
155        let batch = RecordBatch::try_new(
156            schema,
157            vec![
158                station_id_array,
159                station_name_array,
160                date_array,
161                latitude_array,
162                longitude_array,
163                min_temp_array,
164                max_temp_array,
165                avg_temp_array,
166                quality_flags_array,
167            ],
168        )?;
169
170        Ok(batch)
171    }
172
173    /// Read sample records from Parquet file
174    pub fn read_sample_records(
175        &self,
176        path: &Path,
177        limit: usize,
178    ) -> Result<Vec<ConsolidatedRecord>> {
179        use arrow::array::*;
180        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
181
182        let file = File::open(path)?;
183        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
184            .with_batch_size(limit.min(8192))
185            .build()?;
186
187        let mut records = Vec::new();
188        let mut total_read = 0;
189
190        for batch_result in parquet_reader {
191            let batch = batch_result?;
192
193            // Extract arrays from the batch
194            let station_ids = batch
195                .column(0)
196                .as_any()
197                .downcast_ref::<UInt32Array>()
198                .ok_or_else(|| {
199                    crate::error::ProcessingError::Config(
200                        "Invalid station_id column type".to_string(),
201                    )
202                })?;
203            let station_names = batch
204                .column(1)
205                .as_any()
206                .downcast_ref::<StringArray>()
207                .ok_or_else(|| {
208                    crate::error::ProcessingError::Config(
209                        "Invalid station_name column type".to_string(),
210                    )
211                })?;
212            let dates = batch
213                .column(2)
214                .as_any()
215                .downcast_ref::<Date32Array>()
216                .ok_or_else(|| {
217                    crate::error::ProcessingError::Config("Invalid date column type".to_string())
218                })?;
219            let latitudes = batch
220                .column(3)
221                .as_any()
222                .downcast_ref::<Float64Array>()
223                .ok_or_else(|| {
224                    crate::error::ProcessingError::Config(
225                        "Invalid latitude column type".to_string(),
226                    )
227                })?;
228            let longitudes = batch
229                .column(4)
230                .as_any()
231                .downcast_ref::<Float64Array>()
232                .ok_or_else(|| {
233                    crate::error::ProcessingError::Config(
234                        "Invalid longitude column type".to_string(),
235                    )
236                })?;
237            let min_temps = batch
238                .column(5)
239                .as_any()
240                .downcast_ref::<Float32Array>()
241                .ok_or_else(|| {
242                    crate::error::ProcessingError::Config(
243                        "Invalid min_temp column type".to_string(),
244                    )
245                })?;
246            let max_temps = batch
247                .column(6)
248                .as_any()
249                .downcast_ref::<Float32Array>()
250                .ok_or_else(|| {
251                    crate::error::ProcessingError::Config(
252                        "Invalid max_temp column type".to_string(),
253                    )
254                })?;
255            let avg_temps = batch
256                .column(7)
257                .as_any()
258                .downcast_ref::<Float32Array>()
259                .ok_or_else(|| {
260                    crate::error::ProcessingError::Config(
261                        "Invalid avg_temp column type".to_string(),
262                    )
263                })?;
264            let quality_flags = batch
265                .column(8)
266                .as_any()
267                .downcast_ref::<StringArray>()
268                .ok_or_else(|| {
269                    crate::error::ProcessingError::Config(
270                        "Invalid quality_flags column type".to_string(),
271                    )
272                })?;
273
274            // Convert to ConsolidatedRecord objects
275            let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
276
277            for i in 0..batch_records_to_read {
278                let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
279                    + chrono::Duration::days(dates.value(i) as i64);
280
281                let record = ConsolidatedRecord::new(
282                    station_ids.value(i),
283                    station_names.value(i).to_string(),
284                    date,
285                    latitudes.value(i),
286                    longitudes.value(i),
287                    min_temps.value(i),
288                    max_temps.value(i),
289                    avg_temps.value(i),
290                    quality_flags.value(i).to_string(),
291                );
292
293                records.push(record);
294                total_read += 1;
295
296                if total_read >= limit {
297                    break;
298                }
299            }
300
301            if total_read >= limit {
302                break;
303            }
304        }
305
306        Ok(records)
307    }
308
309    /// Get file statistics
310    pub fn get_file_info(&self, path: &Path) -> Result<ParquetFileInfo> {
311        use parquet::file::reader::{FileReader, SerializedFileReader};
312        use std::fs::File;
313
314        let file = File::open(path)?;
315        let reader = SerializedFileReader::new(file)?;
316        let metadata = reader.metadata();
317
318        let file_metadata = metadata.file_metadata();
319        let row_groups = metadata.num_row_groups();
320        let total_rows = file_metadata.num_rows();
321        let file_size = std::fs::metadata(path)?.len();
322
323        let mut row_group_sizes = Vec::new();
324        for i in 0..row_groups {
325            let rg_metadata = metadata.row_group(i);
326            row_group_sizes.push(rg_metadata.num_rows());
327        }
328
329        Ok(ParquetFileInfo {
330            total_rows,
331            row_groups: row_groups as i32,
332            row_group_sizes,
333            file_size,
334            compression: self.compression,
335        })
336    }
337
338    /// Write weather records to Parquet file with optional fields
339    pub fn write_weather_records(&self, records: &[WeatherRecord], path: &Path) -> Result<()> {
340        if records.is_empty() {
341            return Ok(());
342        }
343
344        let schema = self.create_weather_schema();
345        let batch = self.weather_records_to_batch(records, schema.clone())?;
346
347        let file = File::create(path)?;
348        let props = WriterProperties::builder()
349            .set_compression(self.compression)
350            .set_max_row_group_size(self.row_group_size)
351            .build();
352
353        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
354        writer.write(&batch)?;
355        writer.close()?;
356
357        Ok(())
358    }
359
360    /// Write weather records in batches for memory efficiency
361    pub fn write_weather_records_batched(
362        &self,
363        records: &[WeatherRecord],
364        path: &Path,
365        batch_size: usize,
366    ) -> Result<()> {
367        if records.is_empty() {
368            return Ok(());
369        }
370
371        let schema = self.create_weather_schema();
372        let file = File::create(path)?;
373        let props = WriterProperties::builder()
374            .set_compression(self.compression)
375            .set_max_row_group_size(self.row_group_size)
376            .build();
377
378        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
379
380        // Write in batches
381        for chunk in records.chunks(batch_size) {
382            let batch = self.weather_records_to_batch(chunk, schema.clone())?;
383            writer.write(&batch)?;
384        }
385
386        writer.close()?;
387        Ok(())
388    }
389
390    /// Create Arrow schema for multi-metric weather data
391    fn create_weather_schema(&self) -> Arc<Schema> {
392        let fields = vec![
393            Field::new("station_id", DataType::UInt32, false),
394            Field::new("station_name", DataType::Utf8, false),
395            Field::new("date", DataType::Date32, false),
396            Field::new("latitude", DataType::Float64, false),
397            Field::new("longitude", DataType::Float64, false),
398            // Optional temperature fields
399            Field::new("temp_min", DataType::Float32, true),
400            Field::new("temp_max", DataType::Float32, true),
401            Field::new("temp_avg", DataType::Float32, true),
402            // Optional precipitation field
403            Field::new("precipitation", DataType::Float32, true),
404            // Optional wind speed field
405            Field::new("wind_speed", DataType::Float32, true),
406            // Quality flag fields (original ECAD)
407            Field::new("temp_quality", DataType::Utf8, true),
408            Field::new("precip_quality", DataType::Utf8, true),
409            Field::new("wind_quality", DataType::Utf8, true),
410            // Physical validation fields
411            Field::new("temp_validation", DataType::Utf8, true),
412            Field::new("precip_validation", DataType::Utf8, true),
413            Field::new("wind_validation", DataType::Utf8, true),
414        ];
415
416        Arc::new(Schema::new(fields))
417    }
418
419    /// Convert weather records to Arrow RecordBatch
420    fn weather_records_to_batch(
421        &self,
422        records: &[WeatherRecord],
423        schema: Arc<Schema>,
424    ) -> Result<RecordBatch> {
425        // Extract data into separate vectors
426        let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
427        let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
428        let dates: Vec<i32> = records
429            .iter()
430            .map(|r| {
431                r.date
432                    .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
433                    .num_days() as i32
434            })
435            .collect();
436        let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
437        let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
438
439        // Temperature data (optional)
440        let temp_mins: Vec<Option<f32>> = records.iter().map(|r| r.temp_min).collect();
441        let temp_maxs: Vec<Option<f32>> = records.iter().map(|r| r.temp_max).collect();
442        let temp_avgs: Vec<Option<f32>> = records.iter().map(|r| r.temp_avg).collect();
443
444        // Other weather metrics (optional)
445        let precipitations: Vec<Option<f32>> = records.iter().map(|r| r.precipitation).collect();
446        let wind_speeds: Vec<Option<f32>> = records.iter().map(|r| r.wind_speed).collect();
447
448        // Quality flags (optional)
449        let temp_qualities: Vec<Option<String>> =
450            records.iter().map(|r| r.temp_quality.clone()).collect();
451        let precip_qualities: Vec<Option<String>> =
452            records.iter().map(|r| r.precip_quality.clone()).collect();
453        let wind_qualities: Vec<Option<String>> =
454            records.iter().map(|r| r.wind_quality.clone()).collect();
455
456        // Physical validation flags (optional)
457        let temp_validations: Vec<Option<String>> = records
458            .iter()
459            .map(|r| r.temp_validation.map(|v| format!("{:?}", v)))
460            .collect();
461        let precip_validations: Vec<Option<String>> = records
462            .iter()
463            .map(|r| r.precip_validation.map(|v| format!("{:?}", v)))
464            .collect();
465        let wind_validations: Vec<Option<String>> = records
466            .iter()
467            .map(|r| r.wind_validation.map(|v| format!("{:?}", v)))
468            .collect();
469
470        // Create Arrow arrays
471        let station_id_array = Arc::new(UInt32Array::from(station_ids));
472        let station_name_array = Arc::new(StringArray::from(station_names));
473        let date_array = Arc::new(Date32Array::from(dates));
474        let latitude_array = Arc::new(Float64Array::from(latitudes));
475        let longitude_array = Arc::new(Float64Array::from(longitudes));
476
477        // Create optional arrays
478        let temp_min_array = Arc::new(Float32Array::from(temp_mins));
479        let temp_max_array = Arc::new(Float32Array::from(temp_maxs));
480        let temp_avg_array = Arc::new(Float32Array::from(temp_avgs));
481        let precipitation_array = Arc::new(Float32Array::from(precipitations));
482        let wind_speed_array = Arc::new(Float32Array::from(wind_speeds));
483
484        let temp_quality_array = Arc::new(StringArray::from(temp_qualities));
485        let precip_quality_array = Arc::new(StringArray::from(precip_qualities));
486        let wind_quality_array = Arc::new(StringArray::from(wind_qualities));
487
488        let temp_validation_array = Arc::new(StringArray::from(temp_validations));
489        let precip_validation_array = Arc::new(StringArray::from(precip_validations));
490        let wind_validation_array = Arc::new(StringArray::from(wind_validations));
491
492        // Create record batch
493        let batch = RecordBatch::try_new(
494            schema,
495            vec![
496                station_id_array,
497                station_name_array,
498                date_array,
499                latitude_array,
500                longitude_array,
501                temp_min_array,
502                temp_max_array,
503                temp_avg_array,
504                precipitation_array,
505                wind_speed_array,
506                temp_quality_array,
507                precip_quality_array,
508                wind_quality_array,
509                temp_validation_array,
510                precip_validation_array,
511                wind_validation_array,
512            ],
513        )?;
514
515        Ok(batch)
516    }
517
518    /// Read sample weather records from Parquet file
519    pub fn read_sample_weather_records(
520        &self,
521        path: &Path,
522        limit: usize,
523    ) -> Result<Vec<WeatherRecord>> {
524        use arrow::array::*;
525        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
526
527        let file = File::open(path)?;
528        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
529            .with_batch_size(limit.min(8192))
530            .build()?;
531
532        let mut records = Vec::new();
533        let mut total_read = 0;
534
535        for batch_result in parquet_reader {
536            let batch = batch_result?;
537
538            // Extract arrays from the batch - handle both old and new schema
539            let num_columns = batch.num_columns();
540
541            if num_columns < 13 {
542                // Old schema format - return empty for now
543                return Ok(Vec::new());
544            }
545
546            let has_validation_fields = num_columns >= 16;
547
548            // New WeatherRecord schema
549            let station_ids = batch
550                .column(0)
551                .as_any()
552                .downcast_ref::<UInt32Array>()
553                .ok_or_else(|| {
554                    crate::error::ProcessingError::Config(
555                        "Invalid station_id column type".to_string(),
556                    )
557                })?;
558            let station_names = batch
559                .column(1)
560                .as_any()
561                .downcast_ref::<StringArray>()
562                .ok_or_else(|| {
563                    crate::error::ProcessingError::Config(
564                        "Invalid station_name column type".to_string(),
565                    )
566                })?;
567            let dates = batch
568                .column(2)
569                .as_any()
570                .downcast_ref::<Date32Array>()
571                .ok_or_else(|| {
572                    crate::error::ProcessingError::Config("Invalid date column type".to_string())
573                })?;
574            let latitudes = batch
575                .column(3)
576                .as_any()
577                .downcast_ref::<Float64Array>()
578                .ok_or_else(|| {
579                    crate::error::ProcessingError::Config(
580                        "Invalid latitude column type".to_string(),
581                    )
582                })?;
583            let longitudes = batch
584                .column(4)
585                .as_any()
586                .downcast_ref::<Float64Array>()
587                .ok_or_else(|| {
588                    crate::error::ProcessingError::Config(
589                        "Invalid longitude column type".to_string(),
590                    )
591                })?;
592
593            // Optional temperature fields
594            let temp_mins = batch
595                .column(5)
596                .as_any()
597                .downcast_ref::<Float32Array>()
598                .ok_or_else(|| {
599                    crate::error::ProcessingError::Config(
600                        "Invalid temp_min column type".to_string(),
601                    )
602                })?;
603            let temp_maxs = batch
604                .column(6)
605                .as_any()
606                .downcast_ref::<Float32Array>()
607                .ok_or_else(|| {
608                    crate::error::ProcessingError::Config(
609                        "Invalid temp_max column type".to_string(),
610                    )
611                })?;
612            let temp_avgs = batch
613                .column(7)
614                .as_any()
615                .downcast_ref::<Float32Array>()
616                .ok_or_else(|| {
617                    crate::error::ProcessingError::Config(
618                        "Invalid temp_avg column type".to_string(),
619                    )
620                })?;
621
622            // Optional other weather metrics
623            let precipitations = batch
624                .column(8)
625                .as_any()
626                .downcast_ref::<Float32Array>()
627                .ok_or_else(|| {
628                    crate::error::ProcessingError::Config(
629                        "Invalid precipitation column type".to_string(),
630                    )
631                })?;
632            let wind_speeds = batch
633                .column(9)
634                .as_any()
635                .downcast_ref::<Float32Array>()
636                .ok_or_else(|| {
637                    crate::error::ProcessingError::Config(
638                        "Invalid wind_speed column type".to_string(),
639                    )
640                })?;
641
642            // Optional quality flags
643            let temp_qualities = batch
644                .column(10)
645                .as_any()
646                .downcast_ref::<StringArray>()
647                .ok_or_else(|| {
648                    crate::error::ProcessingError::Config(
649                        "Invalid temp_quality column type".to_string(),
650                    )
651                })?;
652            let precip_qualities = batch
653                .column(11)
654                .as_any()
655                .downcast_ref::<StringArray>()
656                .ok_or_else(|| {
657                    crate::error::ProcessingError::Config(
658                        "Invalid precip_quality column type".to_string(),
659                    )
660                })?;
661            let wind_qualities = batch
662                .column(12)
663                .as_any()
664                .downcast_ref::<StringArray>()
665                .ok_or_else(|| {
666                    crate::error::ProcessingError::Config(
667                        "Invalid wind_quality column type".to_string(),
668                    )
669                })?;
670
671            // Optional validation fields (new schema)
672            let temp_validations = if has_validation_fields {
673                Some(
674                    batch
675                        .column(13)
676                        .as_any()
677                        .downcast_ref::<StringArray>()
678                        .ok_or_else(|| {
679                            crate::error::ProcessingError::Config(
680                                "Invalid temp_validation column type".to_string(),
681                            )
682                        })?,
683                )
684            } else {
685                None
686            };
687            let precip_validations = if has_validation_fields {
688                Some(
689                    batch
690                        .column(14)
691                        .as_any()
692                        .downcast_ref::<StringArray>()
693                        .ok_or_else(|| {
694                            crate::error::ProcessingError::Config(
695                                "Invalid precip_validation column type".to_string(),
696                            )
697                        })?,
698                )
699            } else {
700                None
701            };
702            let wind_validations = if has_validation_fields {
703                Some(
704                    batch
705                        .column(15)
706                        .as_any()
707                        .downcast_ref::<StringArray>()
708                        .ok_or_else(|| {
709                            crate::error::ProcessingError::Config(
710                                "Invalid wind_validation column type".to_string(),
711                            )
712                        })?,
713                )
714            } else {
715                None
716            };
717
718            // Convert to WeatherRecord objects
719            let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
720
721            for i in 0..batch_records_to_read {
722                let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
723                    + chrono::Duration::days(dates.value(i) as i64);
724
725                use crate::models::weather::PhysicalValidity;
726
727                let record = WeatherRecord::new_raw(
728                    station_ids.value(i),
729                    station_names.value(i).to_string(),
730                    date,
731                    latitudes.value(i),
732                    longitudes.value(i),
733                    if temp_mins.is_null(i) {
734                        None
735                    } else {
736                        Some(temp_mins.value(i))
737                    },
738                    if temp_maxs.is_null(i) {
739                        None
740                    } else {
741                        Some(temp_maxs.value(i))
742                    },
743                    if temp_avgs.is_null(i) {
744                        None
745                    } else {
746                        Some(temp_avgs.value(i))
747                    },
748                    if precipitations.is_null(i) {
749                        None
750                    } else {
751                        Some(precipitations.value(i))
752                    },
753                    if wind_speeds.is_null(i) {
754                        None
755                    } else {
756                        Some(wind_speeds.value(i))
757                    },
758                    if temp_qualities.is_null(i) {
759                        None
760                    } else {
761                        Some(temp_qualities.value(i).to_string())
762                    },
763                    if precip_qualities.is_null(i) {
764                        None
765                    } else {
766                        Some(precip_qualities.value(i).to_string())
767                    },
768                    if wind_qualities.is_null(i) {
769                        None
770                    } else {
771                        Some(wind_qualities.value(i).to_string())
772                    },
773                    // Parse validation fields if available
774                    temp_validations.and_then(|arr| {
775                        if arr.is_null(i) {
776                            None
777                        } else {
778                            PhysicalValidity::parse(arr.value(i))
779                        }
780                    }),
781                    precip_validations.and_then(|arr| {
782                        if arr.is_null(i) {
783                            None
784                        } else {
785                            PhysicalValidity::parse(arr.value(i))
786                        }
787                    }),
788                    wind_validations.and_then(|arr| {
789                        if arr.is_null(i) {
790                            None
791                        } else {
792                            PhysicalValidity::parse(arr.value(i))
793                        }
794                    }),
795                );
796
797                records.push(record);
798                total_read += 1;
799
800                if total_read >= limit {
801                    break;
802                }
803            }
804
805            if total_read >= limit {
806                break;
807            }
808        }
809
810        Ok(records)
811    }
812
813    /// Detect the schema type of a Parquet file
814    pub fn detect_schema_type(&self, path: &Path) -> Result<SchemaType> {
815        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
816
817        let file = File::open(path)?;
818        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
819        let schema = builder.schema();
820
821        // Check number of columns to determine schema type
822        let num_columns = schema.fields().len();
823
824        if num_columns == 9 {
825            // Old ConsolidatedRecord schema: station_id, station_name, date, lat, lon, min_temp, max_temp, avg_temp, quality_flags
826            Ok(SchemaType::ConsolidatedRecord)
827        } else if num_columns == 13 || num_columns == 16 {
828            // WeatherRecord schema: 13 cols = original, 16 cols = with validation fields
829            Ok(SchemaType::WeatherRecord)
830        } else {
831            Ok(SchemaType::Unknown)
832        }
833    }
834
835    /// Analyze a WeatherRecord Parquet file comprehensively
836    pub fn analyze_weather_dataset(
837        &self,
838        path: &Path,
839        sample_size: usize,
840    ) -> Result<WeatherDatasetSummary> {
841        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
842        use std::collections::HashSet;
843
844        let file = File::open(path)?;
845        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
846
847        let mut total_records = 0;
848        let mut stations: HashSet<u32> = HashSet::new();
849        let mut all_records = Vec::new();
850
851        // Bounds tracking
852        let mut min_lat = f64::MAX;
853        let mut max_lat = f64::MIN;
854        let mut min_lon = f64::MAX;
855        let mut max_lon = f64::MIN;
856        let mut min_date = None;
857        let mut max_date = None;
858
859        // Metric statistics
860        let mut temp_records = 0;
861        let mut temp_stations: HashSet<u32> = HashSet::new();
862        let mut temp_dates = Vec::new();
863        let mut precip_records = 0;
864        let mut precip_stations: HashSet<u32> = HashSet::new();
865        let mut precip_dates = Vec::new();
866        let mut wind_records = 0;
867        let mut wind_stations: HashSet<u32> = HashSet::new();
868        let mut wind_dates = Vec::new();
869
870        // Extreme tracking
871        let mut coldest_record: Option<WeatherRecord> = None;
872        let mut hottest_record: Option<WeatherRecord> = None;
873        let mut wettest_record: Option<WeatherRecord> = None;
874        let mut windiest_record: Option<WeatherRecord> = None;
875        let mut min_temp_val = f32::MAX;
876        let mut max_temp_val = f32::MIN;
877        let mut max_precip_val = f32::MIN;
878        let mut max_wind_val = f32::MIN;
879
880        // Enhanced data quality tracking
881        let mut ecad_valid = 0;
882        let mut ecad_suspect = 0;
883        let mut ecad_missing = 0;
884        let mut physically_valid = 0;
885        let mut physically_suspect = 0;
886        let mut physically_invalid = 0;
887        let mut combined_valid = 0;
888        let mut combined_suspect_original = 0;
889        let mut combined_suspect_range = 0;
890        let mut combined_suspect_both = 0;
891        let mut combined_invalid = 0;
892        let mut combined_missing = 0;
893
894        for batch_result in parquet_reader {
895            let batch = batch_result?;
896            let num_rows = batch.num_rows();
897
898            if batch.num_columns() < 13 {
899                continue; // Skip if not WeatherRecord format
900            }
901
902            let has_validation_fields = batch.num_columns() >= 16;
903
904            // Extract arrays
905            let station_ids = batch
906                .column(0)
907                .as_any()
908                .downcast_ref::<UInt32Array>()
909                .unwrap();
910            let station_names = batch
911                .column(1)
912                .as_any()
913                .downcast_ref::<StringArray>()
914                .unwrap();
915            let dates = batch
916                .column(2)
917                .as_any()
918                .downcast_ref::<Date32Array>()
919                .unwrap();
920            let latitudes = batch
921                .column(3)
922                .as_any()
923                .downcast_ref::<Float64Array>()
924                .unwrap();
925            let longitudes = batch
926                .column(4)
927                .as_any()
928                .downcast_ref::<Float64Array>()
929                .unwrap();
930            let temp_mins = batch
931                .column(5)
932                .as_any()
933                .downcast_ref::<Float32Array>()
934                .unwrap();
935            let temp_maxs = batch
936                .column(6)
937                .as_any()
938                .downcast_ref::<Float32Array>()
939                .unwrap();
940            let temp_avgs = batch
941                .column(7)
942                .as_any()
943                .downcast_ref::<Float32Array>()
944                .unwrap();
945            let precipitations = batch
946                .column(8)
947                .as_any()
948                .downcast_ref::<Float32Array>()
949                .unwrap();
950            let wind_speeds = batch
951                .column(9)
952                .as_any()
953                .downcast_ref::<Float32Array>()
954                .unwrap();
955            let temp_qualities = batch
956                .column(10)
957                .as_any()
958                .downcast_ref::<StringArray>()
959                .unwrap();
960            let precip_qualities = batch
961                .column(11)
962                .as_any()
963                .downcast_ref::<StringArray>()
964                .unwrap();
965            let wind_qualities = batch
966                .column(12)
967                .as_any()
968                .downcast_ref::<StringArray>()
969                .unwrap();
970
971            // Optional validation fields
972            let temp_validations = if has_validation_fields {
973                Some(
974                    batch
975                        .column(13)
976                        .as_any()
977                        .downcast_ref::<StringArray>()
978                        .unwrap(),
979                )
980            } else {
981                None
982            };
983            let precip_validations = if has_validation_fields {
984                Some(
985                    batch
986                        .column(14)
987                        .as_any()
988                        .downcast_ref::<StringArray>()
989                        .unwrap(),
990                )
991            } else {
992                None
993            };
994            let wind_validations = if has_validation_fields {
995                Some(
996                    batch
997                        .column(15)
998                        .as_any()
999                        .downcast_ref::<StringArray>()
1000                        .unwrap(),
1001                )
1002            } else {
1003                None
1004            };
1005
1006            for i in 0..num_rows {
1007                total_records += 1;
1008                let station_id = station_ids.value(i);
1009                stations.insert(station_id);
1010
1011                let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
1012                    + chrono::Duration::days(dates.value(i) as i64);
1013
1014                // Update date bounds
1015                min_date = Some(min_date.map_or(date, |d: chrono::NaiveDate| d.min(date)));
1016                max_date = Some(max_date.map_or(date, |d: chrono::NaiveDate| d.max(date)));
1017
1018                // Update geographic bounds
1019                let lat = latitudes.value(i);
1020                let lon = longitudes.value(i);
1021                min_lat = min_lat.min(lat);
1022                max_lat = max_lat.max(lat);
1023                min_lon = min_lon.min(lon);
1024                max_lon = max_lon.max(lon);
1025
1026                // Create record for sampling and analysis
1027                use crate::models::weather::{DataQuality, PhysicalValidity};
1028
1029                let record = WeatherRecord::new_raw(
1030                    station_id,
1031                    station_names.value(i).to_string(),
1032                    date,
1033                    lat,
1034                    lon,
1035                    if temp_mins.is_null(i) {
1036                        None
1037                    } else {
1038                        Some(temp_mins.value(i))
1039                    },
1040                    if temp_maxs.is_null(i) {
1041                        None
1042                    } else {
1043                        Some(temp_maxs.value(i))
1044                    },
1045                    if temp_avgs.is_null(i) {
1046                        None
1047                    } else {
1048                        Some(temp_avgs.value(i))
1049                    },
1050                    if precipitations.is_null(i) {
1051                        None
1052                    } else {
1053                        Some(precipitations.value(i))
1054                    },
1055                    if wind_speeds.is_null(i) {
1056                        None
1057                    } else {
1058                        Some(wind_speeds.value(i))
1059                    },
1060                    if temp_qualities.is_null(i) {
1061                        None
1062                    } else {
1063                        Some(temp_qualities.value(i).to_string())
1064                    },
1065                    if precip_qualities.is_null(i) {
1066                        None
1067                    } else {
1068                        Some(precip_qualities.value(i).to_string())
1069                    },
1070                    if wind_qualities.is_null(i) {
1071                        None
1072                    } else {
1073                        Some(wind_qualities.value(i).to_string())
1074                    },
1075                    // Parse validation fields if available
1076                    temp_validations.and_then(|arr| {
1077                        if arr.is_null(i) {
1078                            None
1079                        } else {
1080                            PhysicalValidity::parse(arr.value(i))
1081                        }
1082                    }),
1083                    precip_validations.and_then(|arr| {
1084                        if arr.is_null(i) {
1085                            None
1086                        } else {
1087                            PhysicalValidity::parse(arr.value(i))
1088                        }
1089                    }),
1090                    wind_validations.and_then(|arr| {
1091                        if arr.is_null(i) {
1092                            None
1093                        } else {
1094                            PhysicalValidity::parse(arr.value(i))
1095                        }
1096                    }),
1097                );
1098
1099                // Track metrics
1100                if record.has_temperature_data() {
1101                    temp_records += 1;
1102                    temp_stations.insert(station_id);
1103                    temp_dates.push(date);
1104
1105                    // Track extreme temperatures (exclude invalid values)
1106                    let temp_quality = record.assess_temperature_quality();
1107                    if !matches!(temp_quality, DataQuality::Invalid) {
1108                        if let Some(min_temp) = record.temp_min {
1109                            if min_temp < min_temp_val {
1110                                min_temp_val = min_temp;
1111                                coldest_record = Some(record.clone());
1112                            }
1113                        }
1114
1115                        if let Some(max_temp) = record.temp_max {
1116                            if max_temp > max_temp_val {
1117                                max_temp_val = max_temp;
1118                                hottest_record = Some(record.clone());
1119                            }
1120                        }
1121                    }
1122                }
1123
1124                if record.has_precipitation() {
1125                    precip_records += 1;
1126                    precip_stations.insert(station_id);
1127                    precip_dates.push(date);
1128
1129                    // Track extreme precipitation (exclude invalid values)
1130                    let precip_quality = record.assess_precipitation_quality();
1131                    if !matches!(precip_quality, DataQuality::Invalid) {
1132                        if let Some(precip) = record.precipitation {
1133                            if precip > max_precip_val {
1134                                max_precip_val = precip;
1135                                wettest_record = Some(record.clone());
1136                            }
1137                        }
1138                    }
1139                }
1140
1141                if record.has_wind_speed() {
1142                    wind_records += 1;
1143                    wind_stations.insert(station_id);
1144                    wind_dates.push(date);
1145
1146                    // Track extreme wind speed (exclude invalid values)
1147                    let wind_quality = record.assess_wind_quality();
1148                    if !matches!(wind_quality, DataQuality::Invalid) {
1149                        if let Some(wind) = record.wind_speed {
1150                            if wind > max_wind_val {
1151                                max_wind_val = wind;
1152                                windiest_record = Some(record.clone());
1153                            }
1154                        }
1155                    }
1156                }
1157
1158                // Enhanced data quality analysis
1159
1160                // Track ECAD quality flags for each metric present
1161
1162                // Temperature ECAD flags
1163                if record.has_temperature_data() {
1164                    if let Some(temp_quality) = &record.temp_quality {
1165                        if temp_quality.contains('0') {
1166                            ecad_valid += 1;
1167                        } else if temp_quality.contains('1') {
1168                            ecad_suspect += 1;
1169                        } else if temp_quality.contains('9') {
1170                            ecad_missing += 1;
1171                        }
1172                    }
1173                }
1174
1175                // Precipitation ECAD flags
1176                if record.has_precipitation() {
1177                    if let Some(precip_quality) = &record.precip_quality {
1178                        if precip_quality == "0" {
1179                            ecad_valid += 1;
1180                        } else if precip_quality == "1" {
1181                            ecad_suspect += 1;
1182                        } else if precip_quality == "9" {
1183                            ecad_missing += 1;
1184                        }
1185                    }
1186                }
1187
1188                // Wind speed ECAD flags
1189                if record.has_wind_speed() {
1190                    if let Some(wind_quality) = &record.wind_quality {
1191                        if wind_quality == "0" {
1192                            ecad_valid += 1;
1193                        } else if wind_quality == "1" {
1194                            ecad_suspect += 1;
1195                        } else if wind_quality == "9" {
1196                            ecad_missing += 1;
1197                        }
1198                    }
1199                }
1200
1201                // Track physical validation for each metric present
1202                if let Some(validation) = record.temp_validation {
1203                    match validation {
1204                        PhysicalValidity::Valid => physically_valid += 1,
1205                        PhysicalValidity::Suspect => physically_suspect += 1,
1206                        PhysicalValidity::Invalid => physically_invalid += 1,
1207                    }
1208                }
1209                if let Some(validation) = record.precip_validation {
1210                    match validation {
1211                        PhysicalValidity::Valid => physically_valid += 1,
1212                        PhysicalValidity::Suspect => physically_suspect += 1,
1213                        PhysicalValidity::Invalid => physically_invalid += 1,
1214                    }
1215                }
1216                if let Some(validation) = record.wind_validation {
1217                    match validation {
1218                        PhysicalValidity::Valid => physically_valid += 1,
1219                        PhysicalValidity::Suspect => physically_suspect += 1,
1220                        PhysicalValidity::Invalid => physically_invalid += 1,
1221                    }
1222                }
1223
1224                // Track combined quality assessment
1225                let temp_quality = record.assess_temperature_quality();
1226                let precip_quality = record.assess_precipitation_quality();
1227                let wind_quality = record.assess_wind_quality();
1228
1229                for quality in [temp_quality, precip_quality, wind_quality] {
1230                    match quality {
1231                        DataQuality::Valid => combined_valid += 1,
1232                        DataQuality::SuspectOriginal => combined_suspect_original += 1,
1233                        DataQuality::SuspectRange => combined_suspect_range += 1,
1234                        DataQuality::SuspectBoth => combined_suspect_both += 1,
1235                        DataQuality::Invalid => combined_invalid += 1,
1236                        DataQuality::Missing => combined_missing += 1,
1237                    }
1238                }
1239
1240                // Store for sampling
1241                all_records.push(record);
1242            }
1243        }
1244
1245        // Create diverse sampling
1246        let sample_records = self.create_diverse_sample(&all_records, sample_size);
1247
1248        // Calculate temporal ranges per metric
1249        temp_dates.sort();
1250        precip_dates.sort();
1251        wind_dates.sort();
1252
1253        let temperature_range = if !temp_dates.is_empty() {
1254            Some((temp_dates[0], temp_dates[temp_dates.len() - 1]))
1255        } else {
1256            None
1257        };
1258
1259        let precipitation_range = if !precip_dates.is_empty() {
1260            Some((precip_dates[0], precip_dates[precip_dates.len() - 1]))
1261        } else {
1262            None
1263        };
1264
1265        let wind_range = if !wind_dates.is_empty() {
1266            Some((wind_dates[0], wind_dates[wind_dates.len() - 1]))
1267        } else {
1268            None
1269        };
1270
1271        // Countries (simplified - could be enhanced with actual geographic lookup)
1272        let countries = if min_lon < -5.0 && max_lat > 53.0 {
1273            vec!["GB".to_string(), "IE".to_string()]
1274        } else if max_lat > 55.0 {
1275            vec!["GB".to_string()]
1276        } else {
1277            vec!["IE".to_string()]
1278        };
1279
1280        Ok(WeatherDatasetSummary {
1281            total_records,
1282            total_stations: stations.len(),
1283            geographic_bounds: GeographicBounds {
1284                min_lat,
1285                max_lat,
1286                min_lon,
1287                max_lon,
1288                countries,
1289            },
1290            temporal_coverage: TemporalCoverage {
1291                overall_start: min_date
1292                    .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap()),
1293                overall_end: max_date
1294                    .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(2000, 1, 1).unwrap()),
1295                temperature_range,
1296                precipitation_range,
1297                wind_range,
1298            },
1299            metric_statistics: MetricStatistics {
1300                temperature_records: temp_records,
1301                temperature_stations: temp_stations.len(),
1302                precipitation_records: precip_records,
1303                precipitation_stations: precip_stations.len(),
1304                wind_records,
1305                wind_stations: wind_stations.len(),
1306                temperature_range: if min_temp_val != f32::MAX && max_temp_val != f32::MIN {
1307                    Some((min_temp_val, max_temp_val))
1308                } else {
1309                    None
1310                },
1311                precipitation_range: if max_precip_val != f32::MIN {
1312                    Some((0.0, max_precip_val))
1313                } else {
1314                    None
1315                },
1316                wind_range: if max_wind_val != f32::MIN {
1317                    Some((0.0, max_wind_val))
1318                } else {
1319                    None
1320                },
1321            },
1322            data_quality: EnhancedDataQuality {
1323                ecad_valid,
1324                ecad_suspect,
1325                ecad_missing,
1326                physically_valid,
1327                physically_suspect,
1328                physically_invalid,
1329                combined_valid,
1330                combined_suspect_original,
1331                combined_suspect_range,
1332                combined_suspect_both,
1333                combined_invalid,
1334                combined_missing,
1335                validation_errors: 0, // Will be populated from integrity report if available
1336            },
1337            sample_records,
1338            extreme_records: ExtremeRecords {
1339                coldest: coldest_record,
1340                hottest: hottest_record,
1341                wettest: wettest_record,
1342                windiest: windiest_record,
1343            },
1344        })
1345    }
1346
1347    /// Create diverse sample of records for display
1348    fn create_diverse_sample(
1349        &self,
1350        all_records: &[WeatherRecord],
1351        sample_size: usize,
1352    ) -> Vec<WeatherRecord> {
1353        use std::collections::HashMap;
1354
1355        if all_records.is_empty() || sample_size == 0 {
1356            return Vec::new();
1357        }
1358
1359        let mut samples = Vec::new();
1360        let mut station_counts: HashMap<String, usize> = HashMap::new();
1361
1362        // Strategy: Sample diverse stations and metric combinations
1363        let total_records = all_records.len();
1364        let step = if total_records > sample_size * 100 {
1365            total_records / (sample_size * 50) // Sample more spread out for large datasets
1366        } else {
1367            std::cmp::max(1, total_records / sample_size)
1368        };
1369
1370        for (i, record) in all_records.iter().enumerate() {
1371            if i % step == 0 && samples.len() < sample_size {
1372                // Limit samples per station for diversity
1373                let station_count = station_counts
1374                    .entry(record.station_name.clone())
1375                    .or_insert(0);
1376                if *station_count < 2 {
1377                    // Max 2 samples per station
1378                    samples.push(record.clone());
1379                    *station_count += 1;
1380                }
1381            }
1382        }
1383
1384        // If we still need more samples, fill with any remaining records
1385        if samples.len() < sample_size {
1386            for record in all_records.iter().step_by(step * 2) {
1387                if samples.len() >= sample_size {
1388                    break;
1389                }
1390                if !samples
1391                    .iter()
1392                    .any(|s| s.station_name == record.station_name && s.date == record.date)
1393                {
1394                    samples.push(record.clone());
1395                }
1396            }
1397        }
1398
1399        samples.truncate(sample_size);
1400        samples
1401    }
1402}
1403
1404#[derive(Debug, PartialEq)]
1405pub enum SchemaType {
1406    ConsolidatedRecord,
1407    WeatherRecord,
1408    Unknown,
1409}
1410
1411#[derive(Debug, Clone)]
1412pub struct WeatherDatasetSummary {
1413    pub total_records: usize,
1414    pub total_stations: usize,
1415    pub geographic_bounds: GeographicBounds,
1416    pub temporal_coverage: TemporalCoverage,
1417    pub metric_statistics: MetricStatistics,
1418    pub data_quality: EnhancedDataQuality,
1419    pub sample_records: Vec<WeatherRecord>,
1420    pub extreme_records: ExtremeRecords,
1421}
1422
1423#[derive(Debug, Clone)]
1424pub struct GeographicBounds {
1425    pub min_lat: f64,
1426    pub max_lat: f64,
1427    pub min_lon: f64,
1428    pub max_lon: f64,
1429    pub countries: Vec<String>,
1430}
1431
1432#[derive(Debug, Clone)]
1433pub struct TemporalCoverage {
1434    pub overall_start: chrono::NaiveDate,
1435    pub overall_end: chrono::NaiveDate,
1436    pub temperature_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1437    pub precipitation_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1438    pub wind_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1439}
1440
1441#[derive(Debug, Clone)]
1442pub struct MetricStatistics {
1443    pub temperature_records: usize,
1444    pub temperature_stations: usize,
1445    pub precipitation_records: usize,
1446    pub precipitation_stations: usize,
1447    pub wind_records: usize,
1448    pub wind_stations: usize,
1449    pub temperature_range: Option<(f32, f32)>,
1450    pub precipitation_range: Option<(f32, f32)>,
1451    pub wind_range: Option<(f32, f32)>,
1452}
1453
1454#[derive(Debug, Clone)]
1455pub struct EnhancedDataQuality {
1456    // ECAD quality flag assessment
1457    pub ecad_valid: usize,
1458    pub ecad_suspect: usize,
1459    pub ecad_missing: usize,
1460
1461    // Physical validation assessment
1462    pub physically_valid: usize,
1463    pub physically_suspect: usize,
1464    pub physically_invalid: usize,
1465
1466    // Combined quality assessment
1467    pub combined_valid: usize,
1468    pub combined_suspect_original: usize,
1469    pub combined_suspect_range: usize,
1470    pub combined_suspect_both: usize,
1471    pub combined_invalid: usize,
1472    pub combined_missing: usize,
1473
1474    pub validation_errors: usize,
1475}
1476
1477#[derive(Debug, Clone)]
1478pub struct ExtremeRecords {
1479    pub coldest: Option<WeatherRecord>,
1480    pub hottest: Option<WeatherRecord>,
1481    pub wettest: Option<WeatherRecord>,
1482    pub windiest: Option<WeatherRecord>,
1483}
1484
1485impl WeatherDatasetSummary {
1486    pub fn display_comprehensive_summary(&self) -> String {
1487        let mut summary = String::new();
1488
1489        // Header
1490        summary.push_str("UNIFIED WEATHER DATASET ANALYSIS\n");
1491        summary.push_str("================================\n\n");
1492
1493        // Dataset Overview
1494        summary.push_str(&format!(
1495            "Dataset Overview:\n\
1496            - Records: {} unified weather records\n\
1497            - Stations: {} across {} ({:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E)\n\
1498            - Timespan: {} to {} ({} years)\n\n",
1499            self.total_records,
1500            self.total_stations,
1501            self.geographic_bounds.countries.join("/"),
1502            self.geographic_bounds.min_lat,
1503            self.geographic_bounds.max_lat,
1504            if self.geographic_bounds.min_lon < 0.0 {
1505                -self.geographic_bounds.min_lon
1506            } else {
1507                self.geographic_bounds.min_lon
1508            },
1509            self.geographic_bounds.max_lon,
1510            self.temporal_coverage.overall_start,
1511            self.temporal_coverage.overall_end,
1512            self.temporal_coverage.overall_end.year() - self.temporal_coverage.overall_start.year()
1513        ));
1514
1515        // Metric Coverage Table
1516        summary.push_str("Metric Coverage:\n");
1517        summary.push_str(
1518            "┌─────────────────┬──────────┬─────────────┬──────────────┬─────────────┐\n",
1519        );
1520        summary.push_str(
1521            "│ Metric          │ Stations │ Records     │ Coverage     │ Date Range  │\n",
1522        );
1523        summary.push_str(
1524            "├─────────────────┼──────────┼─────────────┼──────────────┼─────────────┤\n",
1525        );
1526
1527        // Temperature row
1528        let temp_coverage = if self.total_records > 0 {
1529            (self.metric_statistics.temperature_records as f32 / self.total_records as f32) * 100.0
1530        } else {
1531            0.0
1532        };
1533
1534        let temp_range = if let Some((start, end)) = self.temporal_coverage.temperature_range {
1535            format!("{}-{}", start.year(), end.year())
1536        } else {
1537            "N/A".to_string()
1538        };
1539
1540        summary.push_str(&format!(
1541            "│ Temperature     │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1542            self.metric_statistics.temperature_stations,
1543            self.metric_statistics.temperature_records,
1544            temp_coverage,
1545            temp_range
1546        ));
1547
1548        // Precipitation row
1549        let precip_coverage = if self.total_records > 0 {
1550            (self.metric_statistics.precipitation_records as f32 / self.total_records as f32)
1551                * 100.0
1552        } else {
1553            0.0
1554        };
1555
1556        let precip_range = if let Some((start, end)) = self.temporal_coverage.precipitation_range {
1557            format!("{}-{}", start.year(), end.year())
1558        } else {
1559            "N/A".to_string()
1560        };
1561
1562        summary.push_str(&format!(
1563            "│ Precipitation   │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1564            self.metric_statistics.precipitation_stations,
1565            self.metric_statistics.precipitation_records,
1566            precip_coverage,
1567            precip_range
1568        ));
1569
1570        // Wind row
1571        let wind_coverage = if self.total_records > 0 {
1572            (self.metric_statistics.wind_records as f32 / self.total_records as f32) * 100.0
1573        } else {
1574            0.0
1575        };
1576
1577        let wind_range = if let Some((start, end)) = self.temporal_coverage.wind_range {
1578            format!("{}-{}", start.year(), end.year())
1579        } else {
1580            "N/A".to_string()
1581        };
1582
1583        summary.push_str(&format!(
1584            "│ Wind Speed      │ {:8} │ {:11} │ {:10.1}%  │ {:11} │\n",
1585            self.metric_statistics.wind_stations,
1586            self.metric_statistics.wind_records,
1587            wind_coverage,
1588            wind_range
1589        ));
1590
1591        summary.push_str(
1592            "└─────────────────┴──────────┴─────────────┴──────────────┴─────────────┘\n\n",
1593        );
1594
1595        // Sample Records (diverse)
1596        if !self.sample_records.is_empty() {
1597            summary.push_str("Sample Records (diverse stations & metrics):\n");
1598            for (i, record) in self.sample_records.iter().enumerate() {
1599                let mut metrics_display = Vec::new();
1600
1601                // Temperature display
1602                let temp_parts: Vec<String> = [
1603                    record.temp_min.map(|t| format!("min={:.1}°C", t)),
1604                    record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
1605                    record.temp_max.map(|t| format!("max={:.1}°C", t)),
1606                ]
1607                .into_iter()
1608                .flatten()
1609                .collect();
1610
1611                if !temp_parts.is_empty() {
1612                    metrics_display.push(format!("temp({})", temp_parts.join(", ")));
1613                }
1614
1615                if let Some(precip) = record.precipitation {
1616                    metrics_display.push(format!("precip={:.1}mm", precip));
1617                }
1618
1619                if let Some(wind) = record.wind_speed {
1620                    metrics_display.push(format!("wind={:.1}m/s", wind));
1621                }
1622
1623                let metrics_str = if metrics_display.is_empty() {
1624                    "no data".to_string()
1625                } else {
1626                    metrics_display.join(", ")
1627                };
1628
1629                summary.push_str(&format!(
1630                    "{}. {} on {}: {}\n",
1631                    i + 1,
1632                    record.station_name,
1633                    record.date,
1634                    metrics_str
1635                ));
1636            }
1637            summary.push('\n');
1638        }
1639
1640        // Extreme Records
1641        summary.push_str("Extreme Records:\n");
1642        if let Some(ref coldest) = self.extreme_records.coldest {
1643            if let Some(min_temp) = coldest.temp_min {
1644                summary.push_str(&format!(
1645                    "- Coldest: {:.1}°C at {} ({})\n",
1646                    min_temp, coldest.station_name, coldest.date
1647                ));
1648            }
1649        }
1650
1651        if let Some(ref hottest) = self.extreme_records.hottest {
1652            if let Some(max_temp) = hottest.temp_max {
1653                summary.push_str(&format!(
1654                    "- Hottest: {:.1}°C at {} ({})\n",
1655                    max_temp, hottest.station_name, hottest.date
1656                ));
1657            }
1658        }
1659
1660        if let Some(ref wettest) = self.extreme_records.wettest {
1661            if let Some(precip) = wettest.precipitation {
1662                summary.push_str(&format!(
1663                    "- Wettest: {:.1}mm at {} ({})\n",
1664                    precip, wettest.station_name, wettest.date
1665                ));
1666            }
1667        }
1668
1669        if let Some(ref windiest) = self.extreme_records.windiest {
1670            if let Some(wind) = windiest.wind_speed {
1671                summary.push_str(&format!(
1672                    "- Windiest: {:.1}m/s at {} ({})\n",
1673                    wind, windiest.station_name, windiest.date
1674                ));
1675            }
1676        }
1677        summary.push('\n');
1678
1679        // Enhanced Data Quality Analysis
1680        summary.push_str("Data Quality Analysis:\n");
1681
1682        let total_ecad = self.data_quality.ecad_valid
1683            + self.data_quality.ecad_suspect
1684            + self.data_quality.ecad_missing;
1685        if total_ecad > 0 {
1686            summary.push_str("├─ ECAD Assessment:\n");
1687            summary.push_str(&format!(
1688                "│  ├─ Valid (flag=0): {} ({:.1}%)\n",
1689                self.data_quality.ecad_valid,
1690                (self.data_quality.ecad_valid as f32 / total_ecad as f32) * 100.0
1691            ));
1692            summary.push_str(&format!(
1693                "│  ├─ Suspect (flag=1): {} ({:.1}%)\n",
1694                self.data_quality.ecad_suspect,
1695                (self.data_quality.ecad_suspect as f32 / total_ecad as f32) * 100.0
1696            ));
1697            summary.push_str(&format!(
1698                "│  └─ Missing (flag=9): {} ({:.1}%)\n",
1699                self.data_quality.ecad_missing,
1700                (self.data_quality.ecad_missing as f32 / total_ecad as f32) * 100.0
1701            ));
1702            summary.push_str("│\n");
1703        }
1704
1705        let total_physical = self.data_quality.physically_valid
1706            + self.data_quality.physically_suspect
1707            + self.data_quality.physically_invalid;
1708        if total_physical > 0 {
1709            summary.push_str("├─ Physical Validation:\n");
1710            summary.push_str(&format!(
1711                "│  ├─ Valid: {} ({:.1}%)\n",
1712                self.data_quality.physically_valid,
1713                (self.data_quality.physically_valid as f32 / total_physical as f32) * 100.0
1714            ));
1715            summary.push_str(&format!(
1716                "│  ├─ Suspect: {} ({:.1}%)\n",
1717                self.data_quality.physically_suspect,
1718                (self.data_quality.physically_suspect as f32 / total_physical as f32) * 100.0
1719            ));
1720            summary.push_str(&format!(
1721                "│  └─ Invalid: {} ({:.3}%)\n",
1722                self.data_quality.physically_invalid,
1723                (self.data_quality.physically_invalid as f32 / total_physical as f32) * 100.0
1724            ));
1725            summary.push_str("│\n");
1726        }
1727
1728        let total_combined = self.data_quality.combined_valid
1729            + self.data_quality.combined_suspect_original
1730            + self.data_quality.combined_suspect_range
1731            + self.data_quality.combined_suspect_both
1732            + self.data_quality.combined_invalid
1733            + self.data_quality.combined_missing;
1734        if total_combined > 0 {
1735            summary.push_str("└─ Combined Quality:\n");
1736            summary.push_str(&format!(
1737                "   ├─ Valid: {} ({:.1}%)\n",
1738                self.data_quality.combined_valid,
1739                (self.data_quality.combined_valid as f32 / total_combined as f32) * 100.0
1740            ));
1741            summary.push_str(&format!(
1742                "   ├─ Suspect (original): {} ({:.1}%)\n",
1743                self.data_quality.combined_suspect_original,
1744                (self.data_quality.combined_suspect_original as f32 / total_combined as f32)
1745                    * 100.0
1746            ));
1747            summary.push_str(&format!(
1748                "   ├─ Suspect (range): {} ({:.2}%)\n",
1749                self.data_quality.combined_suspect_range,
1750                (self.data_quality.combined_suspect_range as f32 / total_combined as f32) * 100.0
1751            ));
1752            if self.data_quality.combined_invalid > 0 {
1753                summary.push_str(&format!(
1754                    "   ├─ Invalid: {} ({:.3}%)\n",
1755                    self.data_quality.combined_invalid,
1756                    (self.data_quality.combined_invalid as f32 / total_combined as f32) * 100.0
1757                ));
1758            }
1759            summary.push_str(&format!(
1760                "   └─ Missing: {} ({:.1}%)\n",
1761                self.data_quality.combined_missing,
1762                (self.data_quality.combined_missing as f32 / total_combined as f32) * 100.0
1763            ));
1764        }
1765
1766        if self.data_quality.physically_invalid > 0 {
1767            summary.push_str(&format!(
1768                "\n⚠️  Found {} physically impossible values that were excluded from extreme records analysis\n",
1769                self.data_quality.physically_invalid
1770            ));
1771        }
1772
1773        summary
1774    }
1775}
1776
1777impl Default for ParquetWriter {
1778    fn default() -> Self {
1779        Self::new()
1780    }
1781}
1782
1783#[derive(Debug)]
1784pub struct ParquetFileInfo {
1785    pub total_rows: i64,
1786    pub row_groups: i32,
1787    pub row_group_sizes: Vec<i64>,
1788    pub file_size: u64,
1789    pub compression: Compression,
1790}
1791
1792impl ParquetFileInfo {
1793    pub fn summary(&self) -> String {
1794        format!(
1795            "Parquet File Summary:\n\
1796            - Total rows: {}\n\
1797            - Row groups: {}\n\
1798            - File size: {:.2} MB\n\
1799            - Compression: {:?}\n\
1800            - Avg rows per group: {:.0}",
1801            self.total_rows,
1802            self.row_groups,
1803            self.file_size as f64 / 1_048_576.0, // Convert to MB
1804            self.compression,
1805            self.total_rows as f64 / self.row_groups as f64
1806        )
1807    }
1808}
1809
1810#[cfg(test)]
1811mod tests {
1812    use super::*;
1813    use crate::models::ConsolidatedRecord;
1814    use chrono::NaiveDate;
1815    use tempfile::NamedTempFile;
1816
1817    #[test]
1818    fn test_write_empty_records() {
1819        let writer = ParquetWriter::new();
1820        let temp_file = NamedTempFile::new().unwrap();
1821
1822        let result = writer.write_records(&[], temp_file.path());
1823        assert!(result.is_ok());
1824    }
1825
1826    #[test]
1827    fn test_write_single_record() -> Result<()> {
1828        let writer = ParquetWriter::new();
1829        let temp_file = NamedTempFile::new().unwrap();
1830
1831        let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1832        let record = ConsolidatedRecord::new(
1833            12345,
1834            "Test Station".to_string(),
1835            date,
1836            51.5074,
1837            -0.1278,
1838            15.0,
1839            25.0,
1840            20.0,
1841            "000".to_string(),
1842        );
1843
1844        writer.write_records(&[record], temp_file.path())?;
1845
1846        // Verify file was created and has content
1847        let metadata = std::fs::metadata(temp_file.path())?;
1848        assert!(metadata.len() > 0);
1849
1850        Ok(())
1851    }
1852
1853    #[test]
1854    fn test_different_compressions() -> Result<()> {
1855        let compressions = ["snappy", "gzip", "lz4", "zstd", "none"];
1856
1857        for compression in &compressions {
1858            let writer = ParquetWriter::new().with_compression(compression)?;
1859            let temp_file = NamedTempFile::new().unwrap();
1860
1861            let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1862            let record = ConsolidatedRecord::new(
1863                12345,
1864                "Test Station".to_string(),
1865                date,
1866                51.5074,
1867                -0.1278,
1868                15.0,
1869                25.0,
1870                20.0,
1871                "000".to_string(),
1872            );
1873
1874            let result = writer.write_records(&[record], temp_file.path());
1875            assert!(result.is_ok(), "Failed with compression: {}", compression);
1876        }
1877
1878        Ok(())
1879    }
1880}