1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, WeatherRecord};
3use crate::utils::constants::DEFAULT_ROW_GROUP_SIZE;
4use arrow::array::*;
5use arrow::datatypes::{DataType, Field, Schema};
6use arrow::record_batch::RecordBatch;
7use chrono::Datelike;
8use parquet::arrow::ArrowWriter;
9use parquet::basic::{Compression, GzipLevel};
10use parquet::file::properties::WriterProperties;
11use std::fs::File;
12use std::path::Path;
13use std::sync::Arc;
14
15pub struct ParquetWriter {
16 compression: Compression,
17 row_group_size: usize,
18}
19
20impl ParquetWriter {
21 pub fn new() -> Self {
22 Self {
23 compression: Compression::SNAPPY,
24 row_group_size: DEFAULT_ROW_GROUP_SIZE,
25 }
26 }
27
28 pub fn with_compression(mut self, compression: &str) -> Result<Self> {
29 self.compression = match compression.to_lowercase().as_str() {
30 "snappy" => Compression::SNAPPY,
31 "gzip" => Compression::GZIP(GzipLevel::default()),
32 "lz4" => Compression::LZ4,
33 "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
34 "none" => Compression::UNCOMPRESSED,
35 _ => {
36 return Err(crate::error::ProcessingError::Config(format!(
37 "Unsupported compression: {}",
38 compression
39 )))
40 }
41 };
42 Ok(self)
43 }
44
45 pub fn with_row_group_size(mut self, size: usize) -> Self {
46 self.row_group_size = size;
47 self
48 }
49
50 pub fn write_records(&self, records: &[ConsolidatedRecord], path: &Path) -> Result<()> {
52 if records.is_empty() {
53 return Ok(());
54 }
55
56 let schema = self.create_schema();
57 let batch = self.records_to_batch(records, schema.clone())?;
58
59 let file = File::create(path)?;
60 let props = WriterProperties::builder()
61 .set_compression(self.compression)
62 .set_max_row_group_size(self.row_group_size)
63 .build();
64
65 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
66 writer.write(&batch)?;
67 writer.close()?;
68
69 Ok(())
70 }
71
72 pub fn write_records_batched(
74 &self,
75 records: &[ConsolidatedRecord],
76 path: &Path,
77 batch_size: usize,
78 ) -> Result<()> {
79 if records.is_empty() {
80 return Ok(());
81 }
82
83 let schema = self.create_schema();
84 let file = File::create(path)?;
85 let props = WriterProperties::builder()
86 .set_compression(self.compression)
87 .set_max_row_group_size(self.row_group_size)
88 .build();
89
90 let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
91
92 for chunk in records.chunks(batch_size) {
94 let batch = self.records_to_batch(chunk, schema.clone())?;
95 writer.write(&batch)?;
96 }
97
98 writer.close()?;
99 Ok(())
100 }
101
102 fn create_schema(&self) -> Arc<Schema> {
104 let fields = vec![
105 Field::new("station_id", DataType::UInt32, false),
106 Field::new("station_name", DataType::Utf8, false),
107 Field::new("date", DataType::Date32, false),
108 Field::new("latitude", DataType::Float64, false),
109 Field::new("longitude", DataType::Float64, false),
110 Field::new("min_temp", DataType::Float32, false),
111 Field::new("max_temp", DataType::Float32, false),
112 Field::new("avg_temp", DataType::Float32, false),
113 Field::new("quality_flags", DataType::Utf8, false),
114 ];
115
116 Arc::new(Schema::new(fields))
117 }
118
119 fn records_to_batch(
121 &self,
122 records: &[ConsolidatedRecord],
123 schema: Arc<Schema>,
124 ) -> Result<RecordBatch> {
125 let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
127 let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
128 let dates: Vec<i32> = records.iter().map(|r| r.date.num_days_from_ce()).collect();
129 let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
130 let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
131 let min_temps: Vec<f32> = records.iter().map(|r| r.min_temp).collect();
132 let max_temps: Vec<f32> = records.iter().map(|r| r.max_temp).collect();
133 let avg_temps: Vec<f32> = records.iter().map(|r| r.avg_temp).collect();
134 let quality_flags: Vec<String> = records.iter().map(|r| r.quality_flags.clone()).collect();
135
136 let station_id_array = Arc::new(UInt32Array::from(station_ids));
138 let station_name_array = Arc::new(StringArray::from(station_names));
139 let date_array = Arc::new(Date32Array::from(dates));
140 let latitude_array = Arc::new(Float64Array::from(latitudes));
141 let longitude_array = Arc::new(Float64Array::from(longitudes));
142 let min_temp_array = Arc::new(Float32Array::from(min_temps));
143 let max_temp_array = Arc::new(Float32Array::from(max_temps));
144 let avg_temp_array = Arc::new(Float32Array::from(avg_temps));
145 let quality_flags_array = Arc::new(StringArray::from(quality_flags));
146
147 let batch = RecordBatch::try_new(
149 schema,
150 vec![
151 station_id_array,
152 station_name_array,
153 date_array,
154 latitude_array,
155 longitude_array,
156 min_temp_array,
157 max_temp_array,
158 avg_temp_array,
159 quality_flags_array,
160 ],
161 )?;
162
163 Ok(batch)
164 }
165
166 pub fn read_sample_records(
168 &self,
169 path: &Path,
170 limit: usize,
171 ) -> Result<Vec<ConsolidatedRecord>> {
172 use arrow::array::*;
173 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
174
175 let file = File::open(path)?;
176 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
177 .with_batch_size(limit.min(8192))
178 .build()?;
179
180 let mut records = Vec::new();
181 let mut total_read = 0;
182
183 for batch_result in parquet_reader {
184 let batch = batch_result?;
185
186 let station_ids = batch
188 .column(0)
189 .as_any()
190 .downcast_ref::<UInt32Array>()
191 .ok_or_else(|| {
192 crate::error::ProcessingError::Config(
193 "Invalid station_id column type".to_string(),
194 )
195 })?;
196 let station_names = batch
197 .column(1)
198 .as_any()
199 .downcast_ref::<StringArray>()
200 .ok_or_else(|| {
201 crate::error::ProcessingError::Config(
202 "Invalid station_name column type".to_string(),
203 )
204 })?;
205 let dates = batch
206 .column(2)
207 .as_any()
208 .downcast_ref::<Date32Array>()
209 .ok_or_else(|| {
210 crate::error::ProcessingError::Config("Invalid date column type".to_string())
211 })?;
212 let latitudes = batch
213 .column(3)
214 .as_any()
215 .downcast_ref::<Float64Array>()
216 .ok_or_else(|| {
217 crate::error::ProcessingError::Config(
218 "Invalid latitude column type".to_string(),
219 )
220 })?;
221 let longitudes = batch
222 .column(4)
223 .as_any()
224 .downcast_ref::<Float64Array>()
225 .ok_or_else(|| {
226 crate::error::ProcessingError::Config(
227 "Invalid longitude column type".to_string(),
228 )
229 })?;
230 let min_temps = batch
231 .column(5)
232 .as_any()
233 .downcast_ref::<Float32Array>()
234 .ok_or_else(|| {
235 crate::error::ProcessingError::Config(
236 "Invalid min_temp column type".to_string(),
237 )
238 })?;
239 let max_temps = batch
240 .column(6)
241 .as_any()
242 .downcast_ref::<Float32Array>()
243 .ok_or_else(|| {
244 crate::error::ProcessingError::Config(
245 "Invalid max_temp column type".to_string(),
246 )
247 })?;
248 let avg_temps = batch
249 .column(7)
250 .as_any()
251 .downcast_ref::<Float32Array>()
252 .ok_or_else(|| {
253 crate::error::ProcessingError::Config(
254 "Invalid avg_temp column type".to_string(),
255 )
256 })?;
257 let quality_flags = batch
258 .column(8)
259 .as_any()
260 .downcast_ref::<StringArray>()
261 .ok_or_else(|| {
262 crate::error::ProcessingError::Config(
263 "Invalid quality_flags column type".to_string(),
264 )
265 })?;
266
267 let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
269
270 for i in 0..batch_records_to_read {
271 let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i))
272 .ok_or_else(|| {
273 crate::error::ProcessingError::Config(
274 "Invalid date in Parquet file".to_string(),
275 )
276 })?;
277
278 let record = ConsolidatedRecord::new(
279 station_ids.value(i),
280 station_names.value(i).to_string(),
281 date,
282 latitudes.value(i),
283 longitudes.value(i),
284 min_temps.value(i),
285 max_temps.value(i),
286 avg_temps.value(i),
287 quality_flags.value(i).to_string(),
288 );
289
290 records.push(record);
291 total_read += 1;
292
293 if total_read >= limit {
294 break;
295 }
296 }
297
298 if total_read >= limit {
299 break;
300 }
301 }
302
303 Ok(records)
304 }
305
306 pub fn get_file_info(&self, path: &Path) -> Result<ParquetFileInfo> {
308 use parquet::file::reader::{FileReader, SerializedFileReader};
309 use std::fs::File;
310
311 let file = File::open(path)?;
312 let reader = SerializedFileReader::new(file)?;
313 let metadata = reader.metadata();
314
315 let file_metadata = metadata.file_metadata();
316 let row_groups = metadata.num_row_groups();
317 let total_rows = file_metadata.num_rows();
318 let file_size = std::fs::metadata(path)?.len();
319
320 let mut row_group_sizes = Vec::new();
321 for i in 0..row_groups {
322 let rg_metadata = metadata.row_group(i);
323 row_group_sizes.push(rg_metadata.num_rows());
324 }
325
326 Ok(ParquetFileInfo {
327 total_rows,
328 row_groups: row_groups as i32,
329 row_group_sizes,
330 file_size,
331 compression: self.compression,
332 })
333 }
334
335 pub fn write_weather_records(&self, records: &[WeatherRecord], path: &Path) -> Result<()> {
337 if records.is_empty() {
338 return Ok(());
339 }
340
341 let schema = self.create_weather_schema();
342 let batch = self.weather_records_to_batch(records, schema.clone())?;
343
344 let file = File::create(path)?;
345 let props = WriterProperties::builder()
346 .set_compression(self.compression)
347 .set_max_row_group_size(self.row_group_size)
348 .build();
349
350 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
351 writer.write(&batch)?;
352 writer.close()?;
353
354 Ok(())
355 }
356
357 pub fn write_weather_records_batched(
359 &self,
360 records: &[WeatherRecord],
361 path: &Path,
362 batch_size: usize,
363 ) -> Result<()> {
364 if records.is_empty() {
365 return Ok(());
366 }
367
368 let schema = self.create_weather_schema();
369 let file = File::create(path)?;
370 let props = WriterProperties::builder()
371 .set_compression(self.compression)
372 .set_max_row_group_size(self.row_group_size)
373 .build();
374
375 let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
376
377 for chunk in records.chunks(batch_size) {
379 let batch = self.weather_records_to_batch(chunk, schema.clone())?;
380 writer.write(&batch)?;
381 }
382
383 writer.close()?;
384 Ok(())
385 }
386
387 fn create_weather_schema(&self) -> Arc<Schema> {
389 let fields = vec![
390 Field::new("station_id", DataType::UInt32, false),
391 Field::new("station_name", DataType::Utf8, false),
392 Field::new("date", DataType::Date32, false),
393 Field::new("latitude", DataType::Float64, false),
394 Field::new("longitude", DataType::Float64, false),
395 Field::new("temp_min", DataType::Float32, true),
397 Field::new("temp_max", DataType::Float32, true),
398 Field::new("temp_avg", DataType::Float32, true),
399 Field::new("precipitation", DataType::Float32, true),
401 Field::new("wind_speed", DataType::Float32, true),
403 Field::new("temp_quality", DataType::Utf8, true),
405 Field::new("precip_quality", DataType::Utf8, true),
406 Field::new("wind_quality", DataType::Utf8, true),
407 Field::new("temp_validation", DataType::Utf8, true),
409 Field::new("precip_validation", DataType::Utf8, true),
410 Field::new("wind_validation", DataType::Utf8, true),
411 ];
412
413 Arc::new(Schema::new(fields))
414 }
415
416 fn weather_records_to_batch(
418 &self,
419 records: &[WeatherRecord],
420 schema: Arc<Schema>,
421 ) -> Result<RecordBatch> {
422 let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
424 let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
425 let dates: Vec<i32> = records.iter().map(|r| r.date.num_days_from_ce()).collect();
426 let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
427 let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
428
429 let temp_mins: Vec<Option<f32>> = records.iter().map(|r| r.temp_min).collect();
431 let temp_maxs: Vec<Option<f32>> = records.iter().map(|r| r.temp_max).collect();
432 let temp_avgs: Vec<Option<f32>> = records.iter().map(|r| r.temp_avg).collect();
433
434 let precipitations: Vec<Option<f32>> = records.iter().map(|r| r.precipitation).collect();
436 let wind_speeds: Vec<Option<f32>> = records.iter().map(|r| r.wind_speed).collect();
437
438 let temp_qualities: Vec<Option<String>> =
440 records.iter().map(|r| r.temp_quality.clone()).collect();
441 let precip_qualities: Vec<Option<String>> =
442 records.iter().map(|r| r.precip_quality.clone()).collect();
443 let wind_qualities: Vec<Option<String>> =
444 records.iter().map(|r| r.wind_quality.clone()).collect();
445
446 let temp_validations: Vec<Option<String>> = records
448 .iter()
449 .map(|r| r.temp_validation.map(|v| format!("{:?}", v)))
450 .collect();
451 let precip_validations: Vec<Option<String>> = records
452 .iter()
453 .map(|r| r.precip_validation.map(|v| format!("{:?}", v)))
454 .collect();
455 let wind_validations: Vec<Option<String>> = records
456 .iter()
457 .map(|r| r.wind_validation.map(|v| format!("{:?}", v)))
458 .collect();
459
460 let station_id_array = Arc::new(UInt32Array::from(station_ids));
462 let station_name_array = Arc::new(StringArray::from(station_names));
463 let date_array = Arc::new(Date32Array::from(dates));
464 let latitude_array = Arc::new(Float64Array::from(latitudes));
465 let longitude_array = Arc::new(Float64Array::from(longitudes));
466
467 let temp_min_array = Arc::new(Float32Array::from(temp_mins));
469 let temp_max_array = Arc::new(Float32Array::from(temp_maxs));
470 let temp_avg_array = Arc::new(Float32Array::from(temp_avgs));
471 let precipitation_array = Arc::new(Float32Array::from(precipitations));
472 let wind_speed_array = Arc::new(Float32Array::from(wind_speeds));
473
474 let temp_quality_array = Arc::new(StringArray::from(temp_qualities));
475 let precip_quality_array = Arc::new(StringArray::from(precip_qualities));
476 let wind_quality_array = Arc::new(StringArray::from(wind_qualities));
477
478 let temp_validation_array = Arc::new(StringArray::from(temp_validations));
479 let precip_validation_array = Arc::new(StringArray::from(precip_validations));
480 let wind_validation_array = Arc::new(StringArray::from(wind_validations));
481
482 let batch = RecordBatch::try_new(
484 schema,
485 vec![
486 station_id_array,
487 station_name_array,
488 date_array,
489 latitude_array,
490 longitude_array,
491 temp_min_array,
492 temp_max_array,
493 temp_avg_array,
494 precipitation_array,
495 wind_speed_array,
496 temp_quality_array,
497 precip_quality_array,
498 wind_quality_array,
499 temp_validation_array,
500 precip_validation_array,
501 wind_validation_array,
502 ],
503 )?;
504
505 Ok(batch)
506 }
507
508 pub fn read_sample_weather_records(
510 &self,
511 path: &Path,
512 limit: usize,
513 ) -> Result<Vec<WeatherRecord>> {
514 use arrow::array::*;
515 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
516
517 let file = File::open(path)?;
518 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
519 .with_batch_size(limit.min(8192))
520 .build()?;
521
522 let mut records = Vec::new();
523 let mut total_read = 0;
524
525 for batch_result in parquet_reader {
526 let batch = batch_result?;
527
528 let num_columns = batch.num_columns();
530
531 if num_columns < 13 {
532 return Ok(Vec::new());
534 }
535
536 let has_validation_fields = num_columns >= 16;
537
538 let station_ids = batch
540 .column(0)
541 .as_any()
542 .downcast_ref::<UInt32Array>()
543 .ok_or_else(|| {
544 crate::error::ProcessingError::Config(
545 "Invalid station_id column type".to_string(),
546 )
547 })?;
548 let station_names = batch
549 .column(1)
550 .as_any()
551 .downcast_ref::<StringArray>()
552 .ok_or_else(|| {
553 crate::error::ProcessingError::Config(
554 "Invalid station_name column type".to_string(),
555 )
556 })?;
557 let dates = batch
558 .column(2)
559 .as_any()
560 .downcast_ref::<Date32Array>()
561 .ok_or_else(|| {
562 crate::error::ProcessingError::Config("Invalid date column type".to_string())
563 })?;
564 let latitudes = batch
565 .column(3)
566 .as_any()
567 .downcast_ref::<Float64Array>()
568 .ok_or_else(|| {
569 crate::error::ProcessingError::Config(
570 "Invalid latitude column type".to_string(),
571 )
572 })?;
573 let longitudes = batch
574 .column(4)
575 .as_any()
576 .downcast_ref::<Float64Array>()
577 .ok_or_else(|| {
578 crate::error::ProcessingError::Config(
579 "Invalid longitude column type".to_string(),
580 )
581 })?;
582
583 let temp_mins = batch
585 .column(5)
586 .as_any()
587 .downcast_ref::<Float32Array>()
588 .ok_or_else(|| {
589 crate::error::ProcessingError::Config(
590 "Invalid temp_min column type".to_string(),
591 )
592 })?;
593 let temp_maxs = batch
594 .column(6)
595 .as_any()
596 .downcast_ref::<Float32Array>()
597 .ok_or_else(|| {
598 crate::error::ProcessingError::Config(
599 "Invalid temp_max column type".to_string(),
600 )
601 })?;
602 let temp_avgs = batch
603 .column(7)
604 .as_any()
605 .downcast_ref::<Float32Array>()
606 .ok_or_else(|| {
607 crate::error::ProcessingError::Config(
608 "Invalid temp_avg column type".to_string(),
609 )
610 })?;
611
612 let precipitations = batch
614 .column(8)
615 .as_any()
616 .downcast_ref::<Float32Array>()
617 .ok_or_else(|| {
618 crate::error::ProcessingError::Config(
619 "Invalid precipitation column type".to_string(),
620 )
621 })?;
622 let wind_speeds = batch
623 .column(9)
624 .as_any()
625 .downcast_ref::<Float32Array>()
626 .ok_or_else(|| {
627 crate::error::ProcessingError::Config(
628 "Invalid wind_speed column type".to_string(),
629 )
630 })?;
631
632 let temp_qualities = batch
634 .column(10)
635 .as_any()
636 .downcast_ref::<StringArray>()
637 .ok_or_else(|| {
638 crate::error::ProcessingError::Config(
639 "Invalid temp_quality column type".to_string(),
640 )
641 })?;
642 let precip_qualities = batch
643 .column(11)
644 .as_any()
645 .downcast_ref::<StringArray>()
646 .ok_or_else(|| {
647 crate::error::ProcessingError::Config(
648 "Invalid precip_quality column type".to_string(),
649 )
650 })?;
651 let wind_qualities = batch
652 .column(12)
653 .as_any()
654 .downcast_ref::<StringArray>()
655 .ok_or_else(|| {
656 crate::error::ProcessingError::Config(
657 "Invalid wind_quality column type".to_string(),
658 )
659 })?;
660
661 let temp_validations = if has_validation_fields {
663 Some(
664 batch
665 .column(13)
666 .as_any()
667 .downcast_ref::<StringArray>()
668 .ok_or_else(|| {
669 crate::error::ProcessingError::Config(
670 "Invalid temp_validation column type".to_string(),
671 )
672 })?,
673 )
674 } else {
675 None
676 };
677 let precip_validations = if has_validation_fields {
678 Some(
679 batch
680 .column(14)
681 .as_any()
682 .downcast_ref::<StringArray>()
683 .ok_or_else(|| {
684 crate::error::ProcessingError::Config(
685 "Invalid precip_validation column type".to_string(),
686 )
687 })?,
688 )
689 } else {
690 None
691 };
692 let wind_validations = if has_validation_fields {
693 Some(
694 batch
695 .column(15)
696 .as_any()
697 .downcast_ref::<StringArray>()
698 .ok_or_else(|| {
699 crate::error::ProcessingError::Config(
700 "Invalid wind_validation column type".to_string(),
701 )
702 })?,
703 )
704 } else {
705 None
706 };
707
708 let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
710
711 for i in 0..batch_records_to_read {
712 let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i))
713 .ok_or_else(|| {
714 crate::error::ProcessingError::Config(
715 "Invalid date in Parquet file".to_string(),
716 )
717 })?;
718
719 use crate::models::weather::PhysicalValidity;
720
721 let record = WeatherRecord::new_raw(
722 station_ids.value(i),
723 station_names.value(i).to_string(),
724 date,
725 latitudes.value(i),
726 longitudes.value(i),
727 if temp_mins.is_null(i) {
728 None
729 } else {
730 Some(temp_mins.value(i))
731 },
732 if temp_maxs.is_null(i) {
733 None
734 } else {
735 Some(temp_maxs.value(i))
736 },
737 if temp_avgs.is_null(i) {
738 None
739 } else {
740 Some(temp_avgs.value(i))
741 },
742 if precipitations.is_null(i) {
743 None
744 } else {
745 Some(precipitations.value(i))
746 },
747 if wind_speeds.is_null(i) {
748 None
749 } else {
750 Some(wind_speeds.value(i))
751 },
752 if temp_qualities.is_null(i) {
753 None
754 } else {
755 Some(temp_qualities.value(i).to_string())
756 },
757 if precip_qualities.is_null(i) {
758 None
759 } else {
760 Some(precip_qualities.value(i).to_string())
761 },
762 if wind_qualities.is_null(i) {
763 None
764 } else {
765 Some(wind_qualities.value(i).to_string())
766 },
767 temp_validations.and_then(|arr| {
769 if arr.is_null(i) {
770 None
771 } else {
772 PhysicalValidity::parse(arr.value(i))
773 }
774 }),
775 precip_validations.and_then(|arr| {
776 if arr.is_null(i) {
777 None
778 } else {
779 PhysicalValidity::parse(arr.value(i))
780 }
781 }),
782 wind_validations.and_then(|arr| {
783 if arr.is_null(i) {
784 None
785 } else {
786 PhysicalValidity::parse(arr.value(i))
787 }
788 }),
789 );
790
791 records.push(record);
792 total_read += 1;
793
794 if total_read >= limit {
795 break;
796 }
797 }
798
799 if total_read >= limit {
800 break;
801 }
802 }
803
804 Ok(records)
805 }
806
807 pub fn detect_schema_type(&self, path: &Path) -> Result<SchemaType> {
809 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
810
811 let file = File::open(path)?;
812 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
813 let schema = builder.schema();
814
815 let num_columns = schema.fields().len();
817
818 if num_columns == 9 {
819 Ok(SchemaType::ConsolidatedRecord)
821 } else if num_columns == 13 || num_columns == 16 {
822 Ok(SchemaType::WeatherRecord)
824 } else {
825 Ok(SchemaType::Unknown)
826 }
827 }
828
829 pub fn analyze_weather_dataset(
831 &self,
832 path: &Path,
833 sample_size: usize,
834 ) -> Result<WeatherDatasetSummary> {
835 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
836 use std::collections::HashSet;
837
838 let file = File::open(path)?;
839 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
840
841 let mut total_records = 0;
842 let mut stations: HashSet<u32> = HashSet::new();
843 let mut all_records = Vec::new();
844
845 let mut min_lat = f64::MAX;
847 let mut max_lat = f64::MIN;
848 let mut min_lon = f64::MAX;
849 let mut max_lon = f64::MIN;
850 let mut min_date = None;
851 let mut max_date = None;
852
853 let mut temp_records = 0;
855 let mut temp_stations: HashSet<u32> = HashSet::new();
856 let mut temp_dates = Vec::new();
857 let mut precip_records = 0;
858 let mut precip_stations: HashSet<u32> = HashSet::new();
859 let mut precip_dates = Vec::new();
860 let mut wind_records = 0;
861 let mut wind_stations: HashSet<u32> = HashSet::new();
862 let mut wind_dates = Vec::new();
863
864 let mut coldest_record: Option<WeatherRecord> = None;
866 let mut hottest_record: Option<WeatherRecord> = None;
867 let mut wettest_record: Option<WeatherRecord> = None;
868 let mut windiest_record: Option<WeatherRecord> = None;
869 let mut min_temp_val = f32::MAX;
870 let mut max_temp_val = f32::MIN;
871 let mut max_precip_val = f32::MIN;
872 let mut max_wind_val = f32::MIN;
873
874 let mut ecad_valid = 0;
876 let mut ecad_suspect = 0;
877 let mut ecad_missing = 0;
878 let mut physically_valid = 0;
879 let mut physically_suspect = 0;
880 let mut physically_invalid = 0;
881 let mut combined_valid = 0;
882 let mut combined_suspect_original = 0;
883 let mut combined_suspect_range = 0;
884 let mut combined_suspect_both = 0;
885 let mut combined_invalid = 0;
886 let mut combined_missing = 0;
887
888 for batch_result in parquet_reader {
889 let batch = batch_result?;
890 let num_rows = batch.num_rows();
891
892 if batch.num_columns() < 13 {
893 continue; }
895
896 let has_validation_fields = batch.num_columns() >= 16;
897
898 let station_ids = batch
900 .column(0)
901 .as_any()
902 .downcast_ref::<UInt32Array>()
903 .unwrap();
904 let station_names = batch
905 .column(1)
906 .as_any()
907 .downcast_ref::<StringArray>()
908 .unwrap();
909 let dates = batch
910 .column(2)
911 .as_any()
912 .downcast_ref::<Date32Array>()
913 .unwrap();
914 let latitudes = batch
915 .column(3)
916 .as_any()
917 .downcast_ref::<Float64Array>()
918 .unwrap();
919 let longitudes = batch
920 .column(4)
921 .as_any()
922 .downcast_ref::<Float64Array>()
923 .unwrap();
924 let temp_mins = batch
925 .column(5)
926 .as_any()
927 .downcast_ref::<Float32Array>()
928 .unwrap();
929 let temp_maxs = batch
930 .column(6)
931 .as_any()
932 .downcast_ref::<Float32Array>()
933 .unwrap();
934 let temp_avgs = batch
935 .column(7)
936 .as_any()
937 .downcast_ref::<Float32Array>()
938 .unwrap();
939 let precipitations = batch
940 .column(8)
941 .as_any()
942 .downcast_ref::<Float32Array>()
943 .unwrap();
944 let wind_speeds = batch
945 .column(9)
946 .as_any()
947 .downcast_ref::<Float32Array>()
948 .unwrap();
949 let temp_qualities = batch
950 .column(10)
951 .as_any()
952 .downcast_ref::<StringArray>()
953 .unwrap();
954 let precip_qualities = batch
955 .column(11)
956 .as_any()
957 .downcast_ref::<StringArray>()
958 .unwrap();
959 let wind_qualities = batch
960 .column(12)
961 .as_any()
962 .downcast_ref::<StringArray>()
963 .unwrap();
964
965 let temp_validations = if has_validation_fields {
967 Some(
968 batch
969 .column(13)
970 .as_any()
971 .downcast_ref::<StringArray>()
972 .unwrap(),
973 )
974 } else {
975 None
976 };
977 let precip_validations = if has_validation_fields {
978 Some(
979 batch
980 .column(14)
981 .as_any()
982 .downcast_ref::<StringArray>()
983 .unwrap(),
984 )
985 } else {
986 None
987 };
988 let wind_validations = if has_validation_fields {
989 Some(
990 batch
991 .column(15)
992 .as_any()
993 .downcast_ref::<StringArray>()
994 .unwrap(),
995 )
996 } else {
997 None
998 };
999
1000 for i in 0..num_rows {
1001 total_records += 1;
1002 let station_id = station_ids.value(i);
1003 stations.insert(station_id);
1004
1005 let date = chrono::NaiveDate::from_num_days_from_ce_opt(dates.value(i)).unwrap();
1006
1007 min_date = Some(min_date.map_or(date, |d: chrono::NaiveDate| d.min(date)));
1009 max_date = Some(max_date.map_or(date, |d: chrono::NaiveDate| d.max(date)));
1010
1011 let lat = latitudes.value(i);
1013 let lon = longitudes.value(i);
1014 min_lat = min_lat.min(lat);
1015 max_lat = max_lat.max(lat);
1016 min_lon = min_lon.min(lon);
1017 max_lon = max_lon.max(lon);
1018
1019 use crate::models::weather::{DataQuality, PhysicalValidity};
1021
1022 let record = WeatherRecord::new_raw(
1023 station_id,
1024 station_names.value(i).to_string(),
1025 date,
1026 lat,
1027 lon,
1028 if temp_mins.is_null(i) {
1029 None
1030 } else {
1031 Some(temp_mins.value(i))
1032 },
1033 if temp_maxs.is_null(i) {
1034 None
1035 } else {
1036 Some(temp_maxs.value(i))
1037 },
1038 if temp_avgs.is_null(i) {
1039 None
1040 } else {
1041 Some(temp_avgs.value(i))
1042 },
1043 if precipitations.is_null(i) {
1044 None
1045 } else {
1046 Some(precipitations.value(i))
1047 },
1048 if wind_speeds.is_null(i) {
1049 None
1050 } else {
1051 Some(wind_speeds.value(i))
1052 },
1053 if temp_qualities.is_null(i) {
1054 None
1055 } else {
1056 Some(temp_qualities.value(i).to_string())
1057 },
1058 if precip_qualities.is_null(i) {
1059 None
1060 } else {
1061 Some(precip_qualities.value(i).to_string())
1062 },
1063 if wind_qualities.is_null(i) {
1064 None
1065 } else {
1066 Some(wind_qualities.value(i).to_string())
1067 },
1068 temp_validations.and_then(|arr| {
1070 if arr.is_null(i) {
1071 None
1072 } else {
1073 PhysicalValidity::parse(arr.value(i))
1074 }
1075 }),
1076 precip_validations.and_then(|arr| {
1077 if arr.is_null(i) {
1078 None
1079 } else {
1080 PhysicalValidity::parse(arr.value(i))
1081 }
1082 }),
1083 wind_validations.and_then(|arr| {
1084 if arr.is_null(i) {
1085 None
1086 } else {
1087 PhysicalValidity::parse(arr.value(i))
1088 }
1089 }),
1090 );
1091
1092 if record.has_temperature_data() {
1094 temp_records += 1;
1095 temp_stations.insert(station_id);
1096 temp_dates.push(date);
1097
1098 let temp_quality = record.assess_temperature_quality();
1100 if !matches!(temp_quality, DataQuality::Invalid) {
1101 if let Some(min_temp) = record.temp_min {
1102 if min_temp < min_temp_val {
1103 min_temp_val = min_temp;
1104 coldest_record = Some(record.clone());
1105 }
1106 }
1107
1108 if let Some(max_temp) = record.temp_max {
1109 if max_temp > max_temp_val {
1110 max_temp_val = max_temp;
1111 hottest_record = Some(record.clone());
1112 }
1113 }
1114 }
1115 }
1116
1117 if record.has_precipitation() {
1118 precip_records += 1;
1119 precip_stations.insert(station_id);
1120 precip_dates.push(date);
1121
1122 let precip_quality = record.assess_precipitation_quality();
1124 if !matches!(precip_quality, DataQuality::Invalid) {
1125 if let Some(precip) = record.precipitation {
1126 if precip > max_precip_val {
1127 max_precip_val = precip;
1128 wettest_record = Some(record.clone());
1129 }
1130 }
1131 }
1132 }
1133
1134 if record.has_wind_speed() {
1135 wind_records += 1;
1136 wind_stations.insert(station_id);
1137 wind_dates.push(date);
1138
1139 let wind_quality = record.assess_wind_quality();
1141 if !matches!(wind_quality, DataQuality::Invalid) {
1142 if let Some(wind) = record.wind_speed {
1143 if wind > max_wind_val {
1144 max_wind_val = wind;
1145 windiest_record = Some(record.clone());
1146 }
1147 }
1148 }
1149 }
1150
1151 if record.has_temperature_data() {
1157 if let Some(temp_quality) = &record.temp_quality {
1158 if temp_quality.contains('0') {
1159 ecad_valid += 1;
1160 } else if temp_quality.contains('1') {
1161 ecad_suspect += 1;
1162 } else if temp_quality.contains('9') {
1163 ecad_missing += 1;
1164 }
1165 }
1166 }
1167
1168 if record.has_precipitation() {
1170 if let Some(precip_quality) = &record.precip_quality {
1171 if precip_quality == "0" {
1172 ecad_valid += 1;
1173 } else if precip_quality == "1" {
1174 ecad_suspect += 1;
1175 } else if precip_quality == "9" {
1176 ecad_missing += 1;
1177 }
1178 }
1179 }
1180
1181 if record.has_wind_speed() {
1183 if let Some(wind_quality) = &record.wind_quality {
1184 if wind_quality == "0" {
1185 ecad_valid += 1;
1186 } else if wind_quality == "1" {
1187 ecad_suspect += 1;
1188 } else if wind_quality == "9" {
1189 ecad_missing += 1;
1190 }
1191 }
1192 }
1193
1194 if let Some(validation) = record.temp_validation {
1196 match validation {
1197 PhysicalValidity::Valid => physically_valid += 1,
1198 PhysicalValidity::Suspect => physically_suspect += 1,
1199 PhysicalValidity::Invalid => physically_invalid += 1,
1200 }
1201 }
1202 if let Some(validation) = record.precip_validation {
1203 match validation {
1204 PhysicalValidity::Valid => physically_valid += 1,
1205 PhysicalValidity::Suspect => physically_suspect += 1,
1206 PhysicalValidity::Invalid => physically_invalid += 1,
1207 }
1208 }
1209 if let Some(validation) = record.wind_validation {
1210 match validation {
1211 PhysicalValidity::Valid => physically_valid += 1,
1212 PhysicalValidity::Suspect => physically_suspect += 1,
1213 PhysicalValidity::Invalid => physically_invalid += 1,
1214 }
1215 }
1216
1217 let temp_quality = record.assess_temperature_quality();
1219 let precip_quality = record.assess_precipitation_quality();
1220 let wind_quality = record.assess_wind_quality();
1221
1222 for quality in [temp_quality, precip_quality, wind_quality] {
1223 match quality {
1224 DataQuality::Valid => combined_valid += 1,
1225 DataQuality::SuspectOriginal => combined_suspect_original += 1,
1226 DataQuality::SuspectRange => combined_suspect_range += 1,
1227 DataQuality::SuspectBoth => combined_suspect_both += 1,
1228 DataQuality::Invalid => combined_invalid += 1,
1229 DataQuality::Missing => combined_missing += 1,
1230 }
1231 }
1232
1233 all_records.push(record);
1235 }
1236 }
1237
1238 let sample_records = self.create_diverse_sample(&all_records, sample_size);
1240
1241 temp_dates.sort();
1243 precip_dates.sort();
1244 wind_dates.sort();
1245
1246 let temperature_range = if !temp_dates.is_empty() {
1247 Some((temp_dates[0], temp_dates[temp_dates.len() - 1]))
1248 } else {
1249 None
1250 };
1251
1252 let precipitation_range = if !precip_dates.is_empty() {
1253 Some((precip_dates[0], precip_dates[precip_dates.len() - 1]))
1254 } else {
1255 None
1256 };
1257
1258 let wind_range = if !wind_dates.is_empty() {
1259 Some((wind_dates[0], wind_dates[wind_dates.len() - 1]))
1260 } else {
1261 None
1262 };
1263
1264 let countries = if min_lon < -5.0 && max_lat > 53.0 {
1266 vec!["GB".to_string(), "IE".to_string()]
1267 } else if max_lat > 55.0 {
1268 vec!["GB".to_string()]
1269 } else {
1270 vec!["IE".to_string()]
1271 };
1272
1273 Ok(WeatherDatasetSummary {
1274 total_records,
1275 total_stations: stations.len(),
1276 geographic_bounds: GeographicBounds {
1277 min_lat,
1278 max_lat,
1279 min_lon,
1280 max_lon,
1281 countries,
1282 },
1283 temporal_coverage: TemporalCoverage {
1284 overall_start: min_date
1285 .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap()),
1286 overall_end: max_date
1287 .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(2000, 1, 1).unwrap()),
1288 temperature_range,
1289 precipitation_range,
1290 wind_range,
1291 },
1292 metric_statistics: MetricStatistics {
1293 temperature_records: temp_records,
1294 temperature_stations: temp_stations.len(),
1295 precipitation_records: precip_records,
1296 precipitation_stations: precip_stations.len(),
1297 wind_records,
1298 wind_stations: wind_stations.len(),
1299 temperature_range: if min_temp_val != f32::MAX && max_temp_val != f32::MIN {
1300 Some((min_temp_val, max_temp_val))
1301 } else {
1302 None
1303 },
1304 precipitation_range: if max_precip_val != f32::MIN {
1305 Some((0.0, max_precip_val))
1306 } else {
1307 None
1308 },
1309 wind_range: if max_wind_val != f32::MIN {
1310 Some((0.0, max_wind_val))
1311 } else {
1312 None
1313 },
1314 },
1315 data_quality: EnhancedDataQuality {
1316 ecad_valid,
1317 ecad_suspect,
1318 ecad_missing,
1319 physically_valid,
1320 physically_suspect,
1321 physically_invalid,
1322 combined_valid,
1323 combined_suspect_original,
1324 combined_suspect_range,
1325 combined_suspect_both,
1326 combined_invalid,
1327 combined_missing,
1328 validation_errors: 0, },
1330 sample_records,
1331 extreme_records: ExtremeRecords {
1332 coldest: coldest_record,
1333 hottest: hottest_record,
1334 wettest: wettest_record,
1335 windiest: windiest_record,
1336 },
1337 })
1338 }
1339
1340 fn create_diverse_sample(
1342 &self,
1343 all_records: &[WeatherRecord],
1344 sample_size: usize,
1345 ) -> Vec<WeatherRecord> {
1346 use std::collections::HashMap;
1347
1348 if all_records.is_empty() || sample_size == 0 {
1349 return Vec::new();
1350 }
1351
1352 let mut samples = Vec::new();
1353 let mut station_counts: HashMap<String, usize> = HashMap::new();
1354
1355 let total_records = all_records.len();
1357 let step = if total_records > sample_size * 100 {
1358 total_records / (sample_size * 50) } else {
1360 std::cmp::max(1, total_records / sample_size)
1361 };
1362
1363 for (i, record) in all_records.iter().enumerate() {
1364 if i % step == 0 && samples.len() < sample_size {
1365 let station_count = station_counts
1367 .entry(record.station_name.clone())
1368 .or_insert(0);
1369 if *station_count < 2 {
1370 samples.push(record.clone());
1372 *station_count += 1;
1373 }
1374 }
1375 }
1376
1377 if samples.len() < sample_size {
1379 for record in all_records.iter().step_by(step * 2) {
1380 if samples.len() >= sample_size {
1381 break;
1382 }
1383 if !samples
1384 .iter()
1385 .any(|s| s.station_name == record.station_name && s.date == record.date)
1386 {
1387 samples.push(record.clone());
1388 }
1389 }
1390 }
1391
1392 samples.truncate(sample_size);
1393 samples
1394 }
1395}
1396
1397#[derive(Debug, PartialEq)]
1398pub enum SchemaType {
1399 ConsolidatedRecord,
1400 WeatherRecord,
1401 Unknown,
1402}
1403
1404#[derive(Debug, Clone)]
1405pub struct WeatherDatasetSummary {
1406 pub total_records: usize,
1407 pub total_stations: usize,
1408 pub geographic_bounds: GeographicBounds,
1409 pub temporal_coverage: TemporalCoverage,
1410 pub metric_statistics: MetricStatistics,
1411 pub data_quality: EnhancedDataQuality,
1412 pub sample_records: Vec<WeatherRecord>,
1413 pub extreme_records: ExtremeRecords,
1414}
1415
1416#[derive(Debug, Clone)]
1417pub struct GeographicBounds {
1418 pub min_lat: f64,
1419 pub max_lat: f64,
1420 pub min_lon: f64,
1421 pub max_lon: f64,
1422 pub countries: Vec<String>,
1423}
1424
1425#[derive(Debug, Clone)]
1426pub struct TemporalCoverage {
1427 pub overall_start: chrono::NaiveDate,
1428 pub overall_end: chrono::NaiveDate,
1429 pub temperature_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1430 pub precipitation_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1431 pub wind_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1432}
1433
1434#[derive(Debug, Clone)]
1435pub struct MetricStatistics {
1436 pub temperature_records: usize,
1437 pub temperature_stations: usize,
1438 pub precipitation_records: usize,
1439 pub precipitation_stations: usize,
1440 pub wind_records: usize,
1441 pub wind_stations: usize,
1442 pub temperature_range: Option<(f32, f32)>,
1443 pub precipitation_range: Option<(f32, f32)>,
1444 pub wind_range: Option<(f32, f32)>,
1445}
1446
1447#[derive(Debug, Clone)]
1448pub struct EnhancedDataQuality {
1449 pub ecad_valid: usize,
1451 pub ecad_suspect: usize,
1452 pub ecad_missing: usize,
1453
1454 pub physically_valid: usize,
1456 pub physically_suspect: usize,
1457 pub physically_invalid: usize,
1458
1459 pub combined_valid: usize,
1461 pub combined_suspect_original: usize,
1462 pub combined_suspect_range: usize,
1463 pub combined_suspect_both: usize,
1464 pub combined_invalid: usize,
1465 pub combined_missing: usize,
1466
1467 pub validation_errors: usize,
1468}
1469
1470#[derive(Debug, Clone)]
1471pub struct ExtremeRecords {
1472 pub coldest: Option<WeatherRecord>,
1473 pub hottest: Option<WeatherRecord>,
1474 pub wettest: Option<WeatherRecord>,
1475 pub windiest: Option<WeatherRecord>,
1476}
1477
1478impl WeatherDatasetSummary {
1479 pub fn display_comprehensive_summary(&self) -> String {
1480 let mut summary = String::new();
1481
1482 summary.push_str("UNIFIED WEATHER DATASET ANALYSIS\n");
1484 summary.push_str("================================\n\n");
1485
1486 summary.push_str(&format!(
1488 "Dataset Overview:\n\
1489 - Records: {} unified weather records\n\
1490 - Stations: {} across {} ({:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E)\n\
1491 - Timespan: {} to {} ({} years)\n\n",
1492 self.total_records,
1493 self.total_stations,
1494 self.geographic_bounds.countries.join("/"),
1495 self.geographic_bounds.min_lat,
1496 self.geographic_bounds.max_lat,
1497 if self.geographic_bounds.min_lon < 0.0 {
1498 -self.geographic_bounds.min_lon
1499 } else {
1500 self.geographic_bounds.min_lon
1501 },
1502 self.geographic_bounds.max_lon,
1503 self.temporal_coverage.overall_start,
1504 self.temporal_coverage.overall_end,
1505 self.temporal_coverage.overall_end.year() - self.temporal_coverage.overall_start.year()
1506 ));
1507
1508 summary.push_str("Metric Coverage:\n");
1510 summary.push_str(
1511 "┌─────────────────┬──────────┬─────────────┬──────────────┬─────────────┐\n",
1512 );
1513 summary.push_str(
1514 "│ Metric │ Stations │ Records │ Coverage │ Date Range │\n",
1515 );
1516 summary.push_str(
1517 "├─────────────────┼──────────┼─────────────┼──────────────┼─────────────┤\n",
1518 );
1519
1520 let temp_coverage = if self.total_records > 0 {
1522 (self.metric_statistics.temperature_records as f32 / self.total_records as f32) * 100.0
1523 } else {
1524 0.0
1525 };
1526
1527 let temp_range = if let Some((start, end)) = self.temporal_coverage.temperature_range {
1528 format!("{}-{}", start.year(), end.year())
1529 } else {
1530 "N/A".to_string()
1531 };
1532
1533 summary.push_str(&format!(
1534 "│ Temperature │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1535 self.metric_statistics.temperature_stations,
1536 self.metric_statistics.temperature_records,
1537 temp_coverage,
1538 temp_range
1539 ));
1540
1541 let precip_coverage = if self.total_records > 0 {
1543 (self.metric_statistics.precipitation_records as f32 / self.total_records as f32)
1544 * 100.0
1545 } else {
1546 0.0
1547 };
1548
1549 let precip_range = if let Some((start, end)) = self.temporal_coverage.precipitation_range {
1550 format!("{}-{}", start.year(), end.year())
1551 } else {
1552 "N/A".to_string()
1553 };
1554
1555 summary.push_str(&format!(
1556 "│ Precipitation │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1557 self.metric_statistics.precipitation_stations,
1558 self.metric_statistics.precipitation_records,
1559 precip_coverage,
1560 precip_range
1561 ));
1562
1563 let wind_coverage = if self.total_records > 0 {
1565 (self.metric_statistics.wind_records as f32 / self.total_records as f32) * 100.0
1566 } else {
1567 0.0
1568 };
1569
1570 let wind_range = if let Some((start, end)) = self.temporal_coverage.wind_range {
1571 format!("{}-{}", start.year(), end.year())
1572 } else {
1573 "N/A".to_string()
1574 };
1575
1576 summary.push_str(&format!(
1577 "│ Wind Speed │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1578 self.metric_statistics.wind_stations,
1579 self.metric_statistics.wind_records,
1580 wind_coverage,
1581 wind_range
1582 ));
1583
1584 summary.push_str(
1585 "└─────────────────┴──────────┴─────────────┴──────────────┴─────────────┘\n\n",
1586 );
1587
1588 if !self.sample_records.is_empty() {
1590 summary.push_str("Sample Records (diverse stations & metrics):\n");
1591 for (i, record) in self.sample_records.iter().enumerate() {
1592 let mut metrics_display = Vec::new();
1593
1594 let temp_parts: Vec<String> = [
1596 record.temp_min.map(|t| format!("min={:.1}°C", t)),
1597 record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
1598 record.temp_max.map(|t| format!("max={:.1}°C", t)),
1599 ]
1600 .into_iter()
1601 .flatten()
1602 .collect();
1603
1604 if !temp_parts.is_empty() {
1605 metrics_display.push(format!("temp({})", temp_parts.join(", ")));
1606 }
1607
1608 if let Some(precip) = record.precipitation {
1609 metrics_display.push(format!("precip={:.1}mm", precip));
1610 }
1611
1612 if let Some(wind) = record.wind_speed {
1613 metrics_display.push(format!("wind={:.1}m/s", wind));
1614 }
1615
1616 let metrics_str = if metrics_display.is_empty() {
1617 "no data".to_string()
1618 } else {
1619 metrics_display.join(", ")
1620 };
1621
1622 summary.push_str(&format!(
1623 "{}. {} on {}: {}\n",
1624 i + 1,
1625 record.station_name,
1626 record.date,
1627 metrics_str
1628 ));
1629 }
1630 summary.push('\n');
1631 }
1632
1633 summary.push_str("Extreme Records:\n");
1635 if let Some(ref coldest) = self.extreme_records.coldest {
1636 if let Some(min_temp) = coldest.temp_min {
1637 summary.push_str(&format!(
1638 "- Coldest: {:.1}°C at {} ({})\n",
1639 min_temp, coldest.station_name, coldest.date
1640 ));
1641 }
1642 }
1643
1644 if let Some(ref hottest) = self.extreme_records.hottest {
1645 if let Some(max_temp) = hottest.temp_max {
1646 summary.push_str(&format!(
1647 "- Hottest: {:.1}°C at {} ({})\n",
1648 max_temp, hottest.station_name, hottest.date
1649 ));
1650 }
1651 }
1652
1653 if let Some(ref wettest) = self.extreme_records.wettest {
1654 if let Some(precip) = wettest.precipitation {
1655 summary.push_str(&format!(
1656 "- Wettest: {:.1}mm at {} ({})\n",
1657 precip, wettest.station_name, wettest.date
1658 ));
1659 }
1660 }
1661
1662 if let Some(ref windiest) = self.extreme_records.windiest {
1663 if let Some(wind) = windiest.wind_speed {
1664 summary.push_str(&format!(
1665 "- Windiest: {:.1}m/s at {} ({})\n",
1666 wind, windiest.station_name, windiest.date
1667 ));
1668 }
1669 }
1670 summary.push('\n');
1671
1672 summary.push_str("Data Quality Analysis:\n");
1674
1675 let total_ecad = self.data_quality.ecad_valid
1676 + self.data_quality.ecad_suspect
1677 + self.data_quality.ecad_missing;
1678 if total_ecad > 0 {
1679 summary.push_str("├─ ECAD Assessment:\n");
1680 summary.push_str(&format!(
1681 "│ ├─ Valid (flag=0): {} ({:.1}%)\n",
1682 self.data_quality.ecad_valid,
1683 (self.data_quality.ecad_valid as f32 / total_ecad as f32) * 100.0
1684 ));
1685 summary.push_str(&format!(
1686 "│ ├─ Suspect (flag=1): {} ({:.1}%)\n",
1687 self.data_quality.ecad_suspect,
1688 (self.data_quality.ecad_suspect as f32 / total_ecad as f32) * 100.0
1689 ));
1690 summary.push_str(&format!(
1691 "│ └─ Missing (flag=9): {} ({:.1}%)\n",
1692 self.data_quality.ecad_missing,
1693 (self.data_quality.ecad_missing as f32 / total_ecad as f32) * 100.0
1694 ));
1695 summary.push_str("│\n");
1696 }
1697
1698 let total_physical = self.data_quality.physically_valid
1699 + self.data_quality.physically_suspect
1700 + self.data_quality.physically_invalid;
1701 if total_physical > 0 {
1702 summary.push_str("├─ Physical Validation:\n");
1703 summary.push_str(&format!(
1704 "│ ├─ Valid: {} ({:.1}%)\n",
1705 self.data_quality.physically_valid,
1706 (self.data_quality.physically_valid as f32 / total_physical as f32) * 100.0
1707 ));
1708 summary.push_str(&format!(
1709 "│ ├─ Suspect: {} ({:.1}%)\n",
1710 self.data_quality.physically_suspect,
1711 (self.data_quality.physically_suspect as f32 / total_physical as f32) * 100.0
1712 ));
1713 summary.push_str(&format!(
1714 "│ └─ Invalid: {} ({:.3}%)\n",
1715 self.data_quality.physically_invalid,
1716 (self.data_quality.physically_invalid as f32 / total_physical as f32) * 100.0
1717 ));
1718 summary.push_str("│\n");
1719 }
1720
1721 let total_combined = self.data_quality.combined_valid
1722 + self.data_quality.combined_suspect_original
1723 + self.data_quality.combined_suspect_range
1724 + self.data_quality.combined_suspect_both
1725 + self.data_quality.combined_invalid
1726 + self.data_quality.combined_missing;
1727 if total_combined > 0 {
1728 summary.push_str("└─ Combined Quality:\n");
1729 summary.push_str(&format!(
1730 " ├─ Valid: {} ({:.1}%)\n",
1731 self.data_quality.combined_valid,
1732 (self.data_quality.combined_valid as f32 / total_combined as f32) * 100.0
1733 ));
1734 summary.push_str(&format!(
1735 " ├─ Suspect (original): {} ({:.1}%)\n",
1736 self.data_quality.combined_suspect_original,
1737 (self.data_quality.combined_suspect_original as f32 / total_combined as f32)
1738 * 100.0
1739 ));
1740 summary.push_str(&format!(
1741 " ├─ Suspect (range): {} ({:.2}%)\n",
1742 self.data_quality.combined_suspect_range,
1743 (self.data_quality.combined_suspect_range as f32 / total_combined as f32) * 100.0
1744 ));
1745 if self.data_quality.combined_invalid > 0 {
1746 summary.push_str(&format!(
1747 " ├─ Invalid: {} ({:.3}%)\n",
1748 self.data_quality.combined_invalid,
1749 (self.data_quality.combined_invalid as f32 / total_combined as f32) * 100.0
1750 ));
1751 }
1752 summary.push_str(&format!(
1753 " └─ Missing: {} ({:.1}%)\n",
1754 self.data_quality.combined_missing,
1755 (self.data_quality.combined_missing as f32 / total_combined as f32) * 100.0
1756 ));
1757 }
1758
1759 if self.data_quality.physically_invalid > 0 {
1760 summary.push_str(&format!(
1761 "\n⚠️ Found {} physically impossible values that were excluded from extreme records analysis\n",
1762 self.data_quality.physically_invalid
1763 ));
1764 }
1765
1766 summary
1767 }
1768}
1769
1770impl Default for ParquetWriter {
1771 fn default() -> Self {
1772 Self::new()
1773 }
1774}
1775
1776#[derive(Debug)]
1777pub struct ParquetFileInfo {
1778 pub total_rows: i64,
1779 pub row_groups: i32,
1780 pub row_group_sizes: Vec<i64>,
1781 pub file_size: u64,
1782 pub compression: Compression,
1783}
1784
1785impl ParquetFileInfo {
1786 pub fn summary(&self) -> String {
1787 format!(
1788 "Parquet File Summary:\n\
1789 - Total rows: {}\n\
1790 - Row groups: {}\n\
1791 - File size: {:.2} MB\n\
1792 - Compression: {:?}\n\
1793 - Avg rows per group: {:.0}",
1794 self.total_rows,
1795 self.row_groups,
1796 self.file_size as f64 / 1_048_576.0, self.compression,
1798 self.total_rows as f64 / self.row_groups as f64
1799 )
1800 }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805 use super::*;
1806 use crate::models::ConsolidatedRecord;
1807 use chrono::NaiveDate;
1808 use tempfile::NamedTempFile;
1809
1810 #[test]
1811 fn test_write_empty_records() {
1812 let writer = ParquetWriter::new();
1813 let temp_file = NamedTempFile::new().unwrap();
1814
1815 let result = writer.write_records(&[], temp_file.path());
1816 assert!(result.is_ok());
1817 }
1818
1819 #[test]
1820 fn test_write_single_record() -> Result<()> {
1821 let writer = ParquetWriter::new();
1822 let temp_file = NamedTempFile::new().unwrap();
1823
1824 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1825 let record = ConsolidatedRecord::new(
1826 12345,
1827 "Test Station".to_string(),
1828 date,
1829 51.5074,
1830 -0.1278,
1831 15.0,
1832 25.0,
1833 20.0,
1834 "000".to_string(),
1835 );
1836
1837 writer.write_records(&[record], temp_file.path())?;
1838
1839 let metadata = std::fs::metadata(temp_file.path())?;
1841 assert!(metadata.len() > 0);
1842
1843 Ok(())
1844 }
1845
1846 #[test]
1847 fn test_different_compressions() -> Result<()> {
1848 let compressions = ["snappy", "gzip", "lz4", "zstd", "none"];
1849
1850 for compression in &compressions {
1851 let writer = ParquetWriter::new().with_compression(compression)?;
1852 let temp_file = NamedTempFile::new().unwrap();
1853
1854 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1855 let record = ConsolidatedRecord::new(
1856 12345,
1857 "Test Station".to_string(),
1858 date,
1859 51.5074,
1860 -0.1278,
1861 15.0,
1862 25.0,
1863 20.0,
1864 "000".to_string(),
1865 );
1866
1867 let result = writer.write_records(&[record], temp_file.path());
1868 assert!(result.is_ok(), "Failed with compression: {}", compression);
1869 }
1870
1871 Ok(())
1872 }
1873}