1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, WeatherRecord};
3use crate::utils::constants::DEFAULT_ROW_GROUP_SIZE;
4use arrow::array::*;
5use arrow::datatypes::{DataType, Field, Schema};
6use arrow::record_batch::RecordBatch;
7use chrono::Datelike;
8use parquet::arrow::ArrowWriter;
9use parquet::basic::{Compression, GzipLevel};
10use parquet::file::properties::WriterProperties;
11use std::fs::File;
12use std::path::Path;
13use std::sync::Arc;
14
15pub struct ParquetWriter {
16 compression: Compression,
17 row_group_size: usize,
18}
19
20impl ParquetWriter {
21 pub fn new() -> Self {
22 Self {
23 compression: Compression::SNAPPY,
24 row_group_size: DEFAULT_ROW_GROUP_SIZE,
25 }
26 }
27
28 pub fn with_compression(mut self, compression: &str) -> Result<Self> {
29 self.compression = match compression.to_lowercase().as_str() {
30 "snappy" => Compression::SNAPPY,
31 "gzip" => Compression::GZIP(GzipLevel::default()),
32 "lz4" => Compression::LZ4,
33 "zstd" => Compression::ZSTD(parquet::basic::ZstdLevel::default()),
34 "none" => Compression::UNCOMPRESSED,
35 _ => {
36 return Err(crate::error::ProcessingError::Config(format!(
37 "Unsupported compression: {}",
38 compression
39 )))
40 }
41 };
42 Ok(self)
43 }
44
45 pub fn with_row_group_size(mut self, size: usize) -> Self {
46 self.row_group_size = size;
47 self
48 }
49
50 pub fn write_records(&self, records: &[ConsolidatedRecord], path: &Path) -> Result<()> {
52 if records.is_empty() {
53 return Ok(());
54 }
55
56 let schema = self.create_schema();
57 let batch = self.records_to_batch(records, schema.clone())?;
58
59 let file = File::create(path)?;
60 let props = WriterProperties::builder()
61 .set_compression(self.compression)
62 .set_max_row_group_size(self.row_group_size)
63 .build();
64
65 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
66 writer.write(&batch)?;
67 writer.close()?;
68
69 Ok(())
70 }
71
72 pub fn write_records_batched(
74 &self,
75 records: &[ConsolidatedRecord],
76 path: &Path,
77 batch_size: usize,
78 ) -> Result<()> {
79 if records.is_empty() {
80 return Ok(());
81 }
82
83 let schema = self.create_schema();
84 let file = File::create(path)?;
85 let props = WriterProperties::builder()
86 .set_compression(self.compression)
87 .set_max_row_group_size(self.row_group_size)
88 .build();
89
90 let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
91
92 for chunk in records.chunks(batch_size) {
94 let batch = self.records_to_batch(chunk, schema.clone())?;
95 writer.write(&batch)?;
96 }
97
98 writer.close()?;
99 Ok(())
100 }
101
102 fn create_schema(&self) -> Arc<Schema> {
104 let fields = vec![
105 Field::new("station_id", DataType::UInt32, false),
106 Field::new("station_name", DataType::Utf8, false),
107 Field::new("date", DataType::Date32, false),
108 Field::new("latitude", DataType::Float64, false),
109 Field::new("longitude", DataType::Float64, false),
110 Field::new("min_temp", DataType::Float32, false),
111 Field::new("max_temp", DataType::Float32, false),
112 Field::new("avg_temp", DataType::Float32, false),
113 Field::new("quality_flags", DataType::Utf8, false),
114 ];
115
116 Arc::new(Schema::new(fields))
117 }
118
119 fn records_to_batch(
121 &self,
122 records: &[ConsolidatedRecord],
123 schema: Arc<Schema>,
124 ) -> Result<RecordBatch> {
125 let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
127 let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
128 let dates: Vec<i32> = records
129 .iter()
130 .map(|r| {
131 r.date
132 .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
133 .num_days() as i32
134 })
135 .collect();
136 let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
137 let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
138 let min_temps: Vec<f32> = records.iter().map(|r| r.min_temp).collect();
139 let max_temps: Vec<f32> = records.iter().map(|r| r.max_temp).collect();
140 let avg_temps: Vec<f32> = records.iter().map(|r| r.avg_temp).collect();
141 let quality_flags: Vec<String> = records.iter().map(|r| r.quality_flags.clone()).collect();
142
143 let station_id_array = Arc::new(UInt32Array::from(station_ids));
145 let station_name_array = Arc::new(StringArray::from(station_names));
146 let date_array = Arc::new(Date32Array::from(dates));
147 let latitude_array = Arc::new(Float64Array::from(latitudes));
148 let longitude_array = Arc::new(Float64Array::from(longitudes));
149 let min_temp_array = Arc::new(Float32Array::from(min_temps));
150 let max_temp_array = Arc::new(Float32Array::from(max_temps));
151 let avg_temp_array = Arc::new(Float32Array::from(avg_temps));
152 let quality_flags_array = Arc::new(StringArray::from(quality_flags));
153
154 let batch = RecordBatch::try_new(
156 schema,
157 vec![
158 station_id_array,
159 station_name_array,
160 date_array,
161 latitude_array,
162 longitude_array,
163 min_temp_array,
164 max_temp_array,
165 avg_temp_array,
166 quality_flags_array,
167 ],
168 )?;
169
170 Ok(batch)
171 }
172
173 pub fn read_sample_records(
175 &self,
176 path: &Path,
177 limit: usize,
178 ) -> Result<Vec<ConsolidatedRecord>> {
179 use arrow::array::*;
180 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
181
182 let file = File::open(path)?;
183 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
184 .with_batch_size(limit.min(8192))
185 .build()?;
186
187 let mut records = Vec::new();
188 let mut total_read = 0;
189
190 for batch_result in parquet_reader {
191 let batch = batch_result?;
192
193 let station_ids = batch
195 .column(0)
196 .as_any()
197 .downcast_ref::<UInt32Array>()
198 .ok_or_else(|| {
199 crate::error::ProcessingError::Config(
200 "Invalid station_id column type".to_string(),
201 )
202 })?;
203 let station_names = batch
204 .column(1)
205 .as_any()
206 .downcast_ref::<StringArray>()
207 .ok_or_else(|| {
208 crate::error::ProcessingError::Config(
209 "Invalid station_name column type".to_string(),
210 )
211 })?;
212 let dates = batch
213 .column(2)
214 .as_any()
215 .downcast_ref::<Date32Array>()
216 .ok_or_else(|| {
217 crate::error::ProcessingError::Config("Invalid date column type".to_string())
218 })?;
219 let latitudes = batch
220 .column(3)
221 .as_any()
222 .downcast_ref::<Float64Array>()
223 .ok_or_else(|| {
224 crate::error::ProcessingError::Config(
225 "Invalid latitude column type".to_string(),
226 )
227 })?;
228 let longitudes = batch
229 .column(4)
230 .as_any()
231 .downcast_ref::<Float64Array>()
232 .ok_or_else(|| {
233 crate::error::ProcessingError::Config(
234 "Invalid longitude column type".to_string(),
235 )
236 })?;
237 let min_temps = batch
238 .column(5)
239 .as_any()
240 .downcast_ref::<Float32Array>()
241 .ok_or_else(|| {
242 crate::error::ProcessingError::Config(
243 "Invalid min_temp column type".to_string(),
244 )
245 })?;
246 let max_temps = batch
247 .column(6)
248 .as_any()
249 .downcast_ref::<Float32Array>()
250 .ok_or_else(|| {
251 crate::error::ProcessingError::Config(
252 "Invalid max_temp column type".to_string(),
253 )
254 })?;
255 let avg_temps = batch
256 .column(7)
257 .as_any()
258 .downcast_ref::<Float32Array>()
259 .ok_or_else(|| {
260 crate::error::ProcessingError::Config(
261 "Invalid avg_temp column type".to_string(),
262 )
263 })?;
264 let quality_flags = batch
265 .column(8)
266 .as_any()
267 .downcast_ref::<StringArray>()
268 .ok_or_else(|| {
269 crate::error::ProcessingError::Config(
270 "Invalid quality_flags column type".to_string(),
271 )
272 })?;
273
274 let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
276
277 for i in 0..batch_records_to_read {
278 let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
279 + chrono::Duration::days(dates.value(i) as i64);
280
281 let record = ConsolidatedRecord::new(
282 station_ids.value(i),
283 station_names.value(i).to_string(),
284 date,
285 latitudes.value(i),
286 longitudes.value(i),
287 min_temps.value(i),
288 max_temps.value(i),
289 avg_temps.value(i),
290 quality_flags.value(i).to_string(),
291 );
292
293 records.push(record);
294 total_read += 1;
295
296 if total_read >= limit {
297 break;
298 }
299 }
300
301 if total_read >= limit {
302 break;
303 }
304 }
305
306 Ok(records)
307 }
308
309 pub fn get_file_info(&self, path: &Path) -> Result<ParquetFileInfo> {
311 use parquet::file::reader::{FileReader, SerializedFileReader};
312 use std::fs::File;
313
314 let file = File::open(path)?;
315 let reader = SerializedFileReader::new(file)?;
316 let metadata = reader.metadata();
317
318 let file_metadata = metadata.file_metadata();
319 let row_groups = metadata.num_row_groups();
320 let total_rows = file_metadata.num_rows();
321 let file_size = std::fs::metadata(path)?.len();
322
323 let mut row_group_sizes = Vec::new();
324 for i in 0..row_groups {
325 let rg_metadata = metadata.row_group(i);
326 row_group_sizes.push(rg_metadata.num_rows());
327 }
328
329 Ok(ParquetFileInfo {
330 total_rows,
331 row_groups: row_groups as i32,
332 row_group_sizes,
333 file_size,
334 compression: self.compression,
335 })
336 }
337
338 pub fn write_weather_records(&self, records: &[WeatherRecord], path: &Path) -> Result<()> {
340 if records.is_empty() {
341 return Ok(());
342 }
343
344 let schema = self.create_weather_schema();
345 let batch = self.weather_records_to_batch(records, schema.clone())?;
346
347 let file = File::create(path)?;
348 let props = WriterProperties::builder()
349 .set_compression(self.compression)
350 .set_max_row_group_size(self.row_group_size)
351 .build();
352
353 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
354 writer.write(&batch)?;
355 writer.close()?;
356
357 Ok(())
358 }
359
360 pub fn write_weather_records_batched(
362 &self,
363 records: &[WeatherRecord],
364 path: &Path,
365 batch_size: usize,
366 ) -> Result<()> {
367 if records.is_empty() {
368 return Ok(());
369 }
370
371 let schema = self.create_weather_schema();
372 let file = File::create(path)?;
373 let props = WriterProperties::builder()
374 .set_compression(self.compression)
375 .set_max_row_group_size(self.row_group_size)
376 .build();
377
378 let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
379
380 for chunk in records.chunks(batch_size) {
382 let batch = self.weather_records_to_batch(chunk, schema.clone())?;
383 writer.write(&batch)?;
384 }
385
386 writer.close()?;
387 Ok(())
388 }
389
390 fn create_weather_schema(&self) -> Arc<Schema> {
392 let fields = vec![
393 Field::new("station_id", DataType::UInt32, false),
394 Field::new("station_name", DataType::Utf8, false),
395 Field::new("date", DataType::Date32, false),
396 Field::new("latitude", DataType::Float64, false),
397 Field::new("longitude", DataType::Float64, false),
398 Field::new("temp_min", DataType::Float32, true),
400 Field::new("temp_max", DataType::Float32, true),
401 Field::new("temp_avg", DataType::Float32, true),
402 Field::new("precipitation", DataType::Float32, true),
404 Field::new("wind_speed", DataType::Float32, true),
406 Field::new("temp_quality", DataType::Utf8, true),
408 Field::new("precip_quality", DataType::Utf8, true),
409 Field::new("wind_quality", DataType::Utf8, true),
410 Field::new("temp_validation", DataType::Utf8, true),
412 Field::new("precip_validation", DataType::Utf8, true),
413 Field::new("wind_validation", DataType::Utf8, true),
414 ];
415
416 Arc::new(Schema::new(fields))
417 }
418
419 fn weather_records_to_batch(
421 &self,
422 records: &[WeatherRecord],
423 schema: Arc<Schema>,
424 ) -> Result<RecordBatch> {
425 let station_ids: Vec<u32> = records.iter().map(|r| r.station_id).collect();
427 let station_names: Vec<String> = records.iter().map(|r| r.station_name.clone()).collect();
428 let dates: Vec<i32> = records
429 .iter()
430 .map(|r| {
431 r.date
432 .signed_duration_since(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
433 .num_days() as i32
434 })
435 .collect();
436 let latitudes: Vec<f64> = records.iter().map(|r| r.latitude).collect();
437 let longitudes: Vec<f64> = records.iter().map(|r| r.longitude).collect();
438
439 let temp_mins: Vec<Option<f32>> = records.iter().map(|r| r.temp_min).collect();
441 let temp_maxs: Vec<Option<f32>> = records.iter().map(|r| r.temp_max).collect();
442 let temp_avgs: Vec<Option<f32>> = records.iter().map(|r| r.temp_avg).collect();
443
444 let precipitations: Vec<Option<f32>> = records.iter().map(|r| r.precipitation).collect();
446 let wind_speeds: Vec<Option<f32>> = records.iter().map(|r| r.wind_speed).collect();
447
448 let temp_qualities: Vec<Option<String>> =
450 records.iter().map(|r| r.temp_quality.clone()).collect();
451 let precip_qualities: Vec<Option<String>> =
452 records.iter().map(|r| r.precip_quality.clone()).collect();
453 let wind_qualities: Vec<Option<String>> =
454 records.iter().map(|r| r.wind_quality.clone()).collect();
455
456 let temp_validations: Vec<Option<String>> = records
458 .iter()
459 .map(|r| r.temp_validation.map(|v| format!("{:?}", v)))
460 .collect();
461 let precip_validations: Vec<Option<String>> = records
462 .iter()
463 .map(|r| r.precip_validation.map(|v| format!("{:?}", v)))
464 .collect();
465 let wind_validations: Vec<Option<String>> = records
466 .iter()
467 .map(|r| r.wind_validation.map(|v| format!("{:?}", v)))
468 .collect();
469
470 let station_id_array = Arc::new(UInt32Array::from(station_ids));
472 let station_name_array = Arc::new(StringArray::from(station_names));
473 let date_array = Arc::new(Date32Array::from(dates));
474 let latitude_array = Arc::new(Float64Array::from(latitudes));
475 let longitude_array = Arc::new(Float64Array::from(longitudes));
476
477 let temp_min_array = Arc::new(Float32Array::from(temp_mins));
479 let temp_max_array = Arc::new(Float32Array::from(temp_maxs));
480 let temp_avg_array = Arc::new(Float32Array::from(temp_avgs));
481 let precipitation_array = Arc::new(Float32Array::from(precipitations));
482 let wind_speed_array = Arc::new(Float32Array::from(wind_speeds));
483
484 let temp_quality_array = Arc::new(StringArray::from(temp_qualities));
485 let precip_quality_array = Arc::new(StringArray::from(precip_qualities));
486 let wind_quality_array = Arc::new(StringArray::from(wind_qualities));
487
488 let temp_validation_array = Arc::new(StringArray::from(temp_validations));
489 let precip_validation_array = Arc::new(StringArray::from(precip_validations));
490 let wind_validation_array = Arc::new(StringArray::from(wind_validations));
491
492 let batch = RecordBatch::try_new(
494 schema,
495 vec![
496 station_id_array,
497 station_name_array,
498 date_array,
499 latitude_array,
500 longitude_array,
501 temp_min_array,
502 temp_max_array,
503 temp_avg_array,
504 precipitation_array,
505 wind_speed_array,
506 temp_quality_array,
507 precip_quality_array,
508 wind_quality_array,
509 temp_validation_array,
510 precip_validation_array,
511 wind_validation_array,
512 ],
513 )?;
514
515 Ok(batch)
516 }
517
518 pub fn read_sample_weather_records(
520 &self,
521 path: &Path,
522 limit: usize,
523 ) -> Result<Vec<WeatherRecord>> {
524 use arrow::array::*;
525 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
526
527 let file = File::open(path)?;
528 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
529 .with_batch_size(limit.min(8192))
530 .build()?;
531
532 let mut records = Vec::new();
533 let mut total_read = 0;
534
535 for batch_result in parquet_reader {
536 let batch = batch_result?;
537
538 let num_columns = batch.num_columns();
540
541 if num_columns < 13 {
542 return Ok(Vec::new());
544 }
545
546 let has_validation_fields = num_columns >= 16;
547
548 let station_ids = batch
550 .column(0)
551 .as_any()
552 .downcast_ref::<UInt32Array>()
553 .ok_or_else(|| {
554 crate::error::ProcessingError::Config(
555 "Invalid station_id column type".to_string(),
556 )
557 })?;
558 let station_names = batch
559 .column(1)
560 .as_any()
561 .downcast_ref::<StringArray>()
562 .ok_or_else(|| {
563 crate::error::ProcessingError::Config(
564 "Invalid station_name column type".to_string(),
565 )
566 })?;
567 let dates = batch
568 .column(2)
569 .as_any()
570 .downcast_ref::<Date32Array>()
571 .ok_or_else(|| {
572 crate::error::ProcessingError::Config("Invalid date column type".to_string())
573 })?;
574 let latitudes = batch
575 .column(3)
576 .as_any()
577 .downcast_ref::<Float64Array>()
578 .ok_or_else(|| {
579 crate::error::ProcessingError::Config(
580 "Invalid latitude column type".to_string(),
581 )
582 })?;
583 let longitudes = batch
584 .column(4)
585 .as_any()
586 .downcast_ref::<Float64Array>()
587 .ok_or_else(|| {
588 crate::error::ProcessingError::Config(
589 "Invalid longitude column type".to_string(),
590 )
591 })?;
592
593 let temp_mins = batch
595 .column(5)
596 .as_any()
597 .downcast_ref::<Float32Array>()
598 .ok_or_else(|| {
599 crate::error::ProcessingError::Config(
600 "Invalid temp_min column type".to_string(),
601 )
602 })?;
603 let temp_maxs = batch
604 .column(6)
605 .as_any()
606 .downcast_ref::<Float32Array>()
607 .ok_or_else(|| {
608 crate::error::ProcessingError::Config(
609 "Invalid temp_max column type".to_string(),
610 )
611 })?;
612 let temp_avgs = batch
613 .column(7)
614 .as_any()
615 .downcast_ref::<Float32Array>()
616 .ok_or_else(|| {
617 crate::error::ProcessingError::Config(
618 "Invalid temp_avg column type".to_string(),
619 )
620 })?;
621
622 let precipitations = batch
624 .column(8)
625 .as_any()
626 .downcast_ref::<Float32Array>()
627 .ok_or_else(|| {
628 crate::error::ProcessingError::Config(
629 "Invalid precipitation column type".to_string(),
630 )
631 })?;
632 let wind_speeds = batch
633 .column(9)
634 .as_any()
635 .downcast_ref::<Float32Array>()
636 .ok_or_else(|| {
637 crate::error::ProcessingError::Config(
638 "Invalid wind_speed column type".to_string(),
639 )
640 })?;
641
642 let temp_qualities = batch
644 .column(10)
645 .as_any()
646 .downcast_ref::<StringArray>()
647 .ok_or_else(|| {
648 crate::error::ProcessingError::Config(
649 "Invalid temp_quality column type".to_string(),
650 )
651 })?;
652 let precip_qualities = batch
653 .column(11)
654 .as_any()
655 .downcast_ref::<StringArray>()
656 .ok_or_else(|| {
657 crate::error::ProcessingError::Config(
658 "Invalid precip_quality column type".to_string(),
659 )
660 })?;
661 let wind_qualities = batch
662 .column(12)
663 .as_any()
664 .downcast_ref::<StringArray>()
665 .ok_or_else(|| {
666 crate::error::ProcessingError::Config(
667 "Invalid wind_quality column type".to_string(),
668 )
669 })?;
670
671 let temp_validations = if has_validation_fields {
673 Some(
674 batch
675 .column(13)
676 .as_any()
677 .downcast_ref::<StringArray>()
678 .ok_or_else(|| {
679 crate::error::ProcessingError::Config(
680 "Invalid temp_validation column type".to_string(),
681 )
682 })?,
683 )
684 } else {
685 None
686 };
687 let precip_validations = if has_validation_fields {
688 Some(
689 batch
690 .column(14)
691 .as_any()
692 .downcast_ref::<StringArray>()
693 .ok_or_else(|| {
694 crate::error::ProcessingError::Config(
695 "Invalid precip_validation column type".to_string(),
696 )
697 })?,
698 )
699 } else {
700 None
701 };
702 let wind_validations = if has_validation_fields {
703 Some(
704 batch
705 .column(15)
706 .as_any()
707 .downcast_ref::<StringArray>()
708 .ok_or_else(|| {
709 crate::error::ProcessingError::Config(
710 "Invalid wind_validation column type".to_string(),
711 )
712 })?,
713 )
714 } else {
715 None
716 };
717
718 let batch_records_to_read = (batch.num_rows()).min(limit - total_read);
720
721 for i in 0..batch_records_to_read {
722 let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
723 + chrono::Duration::days(dates.value(i) as i64);
724
725 use crate::models::weather::PhysicalValidity;
726
727 let record = WeatherRecord::new_raw(
728 station_ids.value(i),
729 station_names.value(i).to_string(),
730 date,
731 latitudes.value(i),
732 longitudes.value(i),
733 if temp_mins.is_null(i) {
734 None
735 } else {
736 Some(temp_mins.value(i))
737 },
738 if temp_maxs.is_null(i) {
739 None
740 } else {
741 Some(temp_maxs.value(i))
742 },
743 if temp_avgs.is_null(i) {
744 None
745 } else {
746 Some(temp_avgs.value(i))
747 },
748 if precipitations.is_null(i) {
749 None
750 } else {
751 Some(precipitations.value(i))
752 },
753 if wind_speeds.is_null(i) {
754 None
755 } else {
756 Some(wind_speeds.value(i))
757 },
758 if temp_qualities.is_null(i) {
759 None
760 } else {
761 Some(temp_qualities.value(i).to_string())
762 },
763 if precip_qualities.is_null(i) {
764 None
765 } else {
766 Some(precip_qualities.value(i).to_string())
767 },
768 if wind_qualities.is_null(i) {
769 None
770 } else {
771 Some(wind_qualities.value(i).to_string())
772 },
773 temp_validations.and_then(|arr| {
775 if arr.is_null(i) {
776 None
777 } else {
778 PhysicalValidity::parse(arr.value(i))
779 }
780 }),
781 precip_validations.and_then(|arr| {
782 if arr.is_null(i) {
783 None
784 } else {
785 PhysicalValidity::parse(arr.value(i))
786 }
787 }),
788 wind_validations.and_then(|arr| {
789 if arr.is_null(i) {
790 None
791 } else {
792 PhysicalValidity::parse(arr.value(i))
793 }
794 }),
795 );
796
797 records.push(record);
798 total_read += 1;
799
800 if total_read >= limit {
801 break;
802 }
803 }
804
805 if total_read >= limit {
806 break;
807 }
808 }
809
810 Ok(records)
811 }
812
813 pub fn detect_schema_type(&self, path: &Path) -> Result<SchemaType> {
815 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
816
817 let file = File::open(path)?;
818 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
819 let schema = builder.schema();
820
821 let num_columns = schema.fields().len();
823
824 if num_columns == 9 {
825 Ok(SchemaType::ConsolidatedRecord)
827 } else if num_columns == 13 || num_columns == 16 {
828 Ok(SchemaType::WeatherRecord)
830 } else {
831 Ok(SchemaType::Unknown)
832 }
833 }
834
835 pub fn analyze_weather_dataset(
837 &self,
838 path: &Path,
839 sample_size: usize,
840 ) -> Result<WeatherDatasetSummary> {
841 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
842 use std::collections::HashSet;
843
844 let file = File::open(path)?;
845 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
846
847 let mut total_records = 0;
848 let mut stations: HashSet<u32> = HashSet::new();
849 let mut all_records = Vec::new();
850
851 let mut min_lat = f64::MAX;
853 let mut max_lat = f64::MIN;
854 let mut min_lon = f64::MAX;
855 let mut max_lon = f64::MIN;
856 let mut min_date = None;
857 let mut max_date = None;
858
859 let mut temp_records = 0;
861 let mut temp_stations: HashSet<u32> = HashSet::new();
862 let mut temp_dates = Vec::new();
863 let mut precip_records = 0;
864 let mut precip_stations: HashSet<u32> = HashSet::new();
865 let mut precip_dates = Vec::new();
866 let mut wind_records = 0;
867 let mut wind_stations: HashSet<u32> = HashSet::new();
868 let mut wind_dates = Vec::new();
869
870 let mut coldest_record: Option<WeatherRecord> = None;
872 let mut hottest_record: Option<WeatherRecord> = None;
873 let mut wettest_record: Option<WeatherRecord> = None;
874 let mut windiest_record: Option<WeatherRecord> = None;
875 let mut min_temp_val = f32::MAX;
876 let mut max_temp_val = f32::MIN;
877 let mut max_precip_val = f32::MIN;
878 let mut max_wind_val = f32::MIN;
879
880 let mut ecad_valid = 0;
882 let mut ecad_suspect = 0;
883 let mut ecad_missing = 0;
884 let mut physically_valid = 0;
885 let mut physically_suspect = 0;
886 let mut physically_invalid = 0;
887 let mut combined_valid = 0;
888 let mut combined_suspect_original = 0;
889 let mut combined_suspect_range = 0;
890 let mut combined_suspect_both = 0;
891 let mut combined_invalid = 0;
892 let mut combined_missing = 0;
893
894 for batch_result in parquet_reader {
895 let batch = batch_result?;
896 let num_rows = batch.num_rows();
897
898 if batch.num_columns() < 13 {
899 continue; }
901
902 let has_validation_fields = batch.num_columns() >= 16;
903
904 let station_ids = batch
906 .column(0)
907 .as_any()
908 .downcast_ref::<UInt32Array>()
909 .unwrap();
910 let station_names = batch
911 .column(1)
912 .as_any()
913 .downcast_ref::<StringArray>()
914 .unwrap();
915 let dates = batch
916 .column(2)
917 .as_any()
918 .downcast_ref::<Date32Array>()
919 .unwrap();
920 let latitudes = batch
921 .column(3)
922 .as_any()
923 .downcast_ref::<Float64Array>()
924 .unwrap();
925 let longitudes = batch
926 .column(4)
927 .as_any()
928 .downcast_ref::<Float64Array>()
929 .unwrap();
930 let temp_mins = batch
931 .column(5)
932 .as_any()
933 .downcast_ref::<Float32Array>()
934 .unwrap();
935 let temp_maxs = batch
936 .column(6)
937 .as_any()
938 .downcast_ref::<Float32Array>()
939 .unwrap();
940 let temp_avgs = batch
941 .column(7)
942 .as_any()
943 .downcast_ref::<Float32Array>()
944 .unwrap();
945 let precipitations = batch
946 .column(8)
947 .as_any()
948 .downcast_ref::<Float32Array>()
949 .unwrap();
950 let wind_speeds = batch
951 .column(9)
952 .as_any()
953 .downcast_ref::<Float32Array>()
954 .unwrap();
955 let temp_qualities = batch
956 .column(10)
957 .as_any()
958 .downcast_ref::<StringArray>()
959 .unwrap();
960 let precip_qualities = batch
961 .column(11)
962 .as_any()
963 .downcast_ref::<StringArray>()
964 .unwrap();
965 let wind_qualities = batch
966 .column(12)
967 .as_any()
968 .downcast_ref::<StringArray>()
969 .unwrap();
970
971 let temp_validations = if has_validation_fields {
973 Some(
974 batch
975 .column(13)
976 .as_any()
977 .downcast_ref::<StringArray>()
978 .unwrap(),
979 )
980 } else {
981 None
982 };
983 let precip_validations = if has_validation_fields {
984 Some(
985 batch
986 .column(14)
987 .as_any()
988 .downcast_ref::<StringArray>()
989 .unwrap(),
990 )
991 } else {
992 None
993 };
994 let wind_validations = if has_validation_fields {
995 Some(
996 batch
997 .column(15)
998 .as_any()
999 .downcast_ref::<StringArray>()
1000 .unwrap(),
1001 )
1002 } else {
1003 None
1004 };
1005
1006 for i in 0..num_rows {
1007 total_records += 1;
1008 let station_id = station_ids.value(i);
1009 stations.insert(station_id);
1010
1011 let date = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
1012 + chrono::Duration::days(dates.value(i) as i64);
1013
1014 min_date = Some(min_date.map_or(date, |d: chrono::NaiveDate| d.min(date)));
1016 max_date = Some(max_date.map_or(date, |d: chrono::NaiveDate| d.max(date)));
1017
1018 let lat = latitudes.value(i);
1020 let lon = longitudes.value(i);
1021 min_lat = min_lat.min(lat);
1022 max_lat = max_lat.max(lat);
1023 min_lon = min_lon.min(lon);
1024 max_lon = max_lon.max(lon);
1025
1026 use crate::models::weather::{DataQuality, PhysicalValidity};
1028
1029 let record = WeatherRecord::new_raw(
1030 station_id,
1031 station_names.value(i).to_string(),
1032 date,
1033 lat,
1034 lon,
1035 if temp_mins.is_null(i) {
1036 None
1037 } else {
1038 Some(temp_mins.value(i))
1039 },
1040 if temp_maxs.is_null(i) {
1041 None
1042 } else {
1043 Some(temp_maxs.value(i))
1044 },
1045 if temp_avgs.is_null(i) {
1046 None
1047 } else {
1048 Some(temp_avgs.value(i))
1049 },
1050 if precipitations.is_null(i) {
1051 None
1052 } else {
1053 Some(precipitations.value(i))
1054 },
1055 if wind_speeds.is_null(i) {
1056 None
1057 } else {
1058 Some(wind_speeds.value(i))
1059 },
1060 if temp_qualities.is_null(i) {
1061 None
1062 } else {
1063 Some(temp_qualities.value(i).to_string())
1064 },
1065 if precip_qualities.is_null(i) {
1066 None
1067 } else {
1068 Some(precip_qualities.value(i).to_string())
1069 },
1070 if wind_qualities.is_null(i) {
1071 None
1072 } else {
1073 Some(wind_qualities.value(i).to_string())
1074 },
1075 temp_validations.and_then(|arr| {
1077 if arr.is_null(i) {
1078 None
1079 } else {
1080 PhysicalValidity::parse(arr.value(i))
1081 }
1082 }),
1083 precip_validations.and_then(|arr| {
1084 if arr.is_null(i) {
1085 None
1086 } else {
1087 PhysicalValidity::parse(arr.value(i))
1088 }
1089 }),
1090 wind_validations.and_then(|arr| {
1091 if arr.is_null(i) {
1092 None
1093 } else {
1094 PhysicalValidity::parse(arr.value(i))
1095 }
1096 }),
1097 );
1098
1099 if record.has_temperature_data() {
1101 temp_records += 1;
1102 temp_stations.insert(station_id);
1103 temp_dates.push(date);
1104
1105 let temp_quality = record.assess_temperature_quality();
1107 if !matches!(temp_quality, DataQuality::Invalid) {
1108 if let Some(min_temp) = record.temp_min {
1109 if min_temp < min_temp_val {
1110 min_temp_val = min_temp;
1111 coldest_record = Some(record.clone());
1112 }
1113 }
1114
1115 if let Some(max_temp) = record.temp_max {
1116 if max_temp > max_temp_val {
1117 max_temp_val = max_temp;
1118 hottest_record = Some(record.clone());
1119 }
1120 }
1121 }
1122 }
1123
1124 if record.has_precipitation() {
1125 precip_records += 1;
1126 precip_stations.insert(station_id);
1127 precip_dates.push(date);
1128
1129 let precip_quality = record.assess_precipitation_quality();
1131 if !matches!(precip_quality, DataQuality::Invalid) {
1132 if let Some(precip) = record.precipitation {
1133 if precip > max_precip_val {
1134 max_precip_val = precip;
1135 wettest_record = Some(record.clone());
1136 }
1137 }
1138 }
1139 }
1140
1141 if record.has_wind_speed() {
1142 wind_records += 1;
1143 wind_stations.insert(station_id);
1144 wind_dates.push(date);
1145
1146 let wind_quality = record.assess_wind_quality();
1148 if !matches!(wind_quality, DataQuality::Invalid) {
1149 if let Some(wind) = record.wind_speed {
1150 if wind > max_wind_val {
1151 max_wind_val = wind;
1152 windiest_record = Some(record.clone());
1153 }
1154 }
1155 }
1156 }
1157
1158 if record.has_temperature_data() {
1164 if let Some(temp_quality) = &record.temp_quality {
1165 if temp_quality.contains('0') {
1166 ecad_valid += 1;
1167 } else if temp_quality.contains('1') {
1168 ecad_suspect += 1;
1169 } else if temp_quality.contains('9') {
1170 ecad_missing += 1;
1171 }
1172 }
1173 }
1174
1175 if record.has_precipitation() {
1177 if let Some(precip_quality) = &record.precip_quality {
1178 if precip_quality == "0" {
1179 ecad_valid += 1;
1180 } else if precip_quality == "1" {
1181 ecad_suspect += 1;
1182 } else if precip_quality == "9" {
1183 ecad_missing += 1;
1184 }
1185 }
1186 }
1187
1188 if record.has_wind_speed() {
1190 if let Some(wind_quality) = &record.wind_quality {
1191 if wind_quality == "0" {
1192 ecad_valid += 1;
1193 } else if wind_quality == "1" {
1194 ecad_suspect += 1;
1195 } else if wind_quality == "9" {
1196 ecad_missing += 1;
1197 }
1198 }
1199 }
1200
1201 if let Some(validation) = record.temp_validation {
1203 match validation {
1204 PhysicalValidity::Valid => physically_valid += 1,
1205 PhysicalValidity::Suspect => physically_suspect += 1,
1206 PhysicalValidity::Invalid => physically_invalid += 1,
1207 }
1208 }
1209 if let Some(validation) = record.precip_validation {
1210 match validation {
1211 PhysicalValidity::Valid => physically_valid += 1,
1212 PhysicalValidity::Suspect => physically_suspect += 1,
1213 PhysicalValidity::Invalid => physically_invalid += 1,
1214 }
1215 }
1216 if let Some(validation) = record.wind_validation {
1217 match validation {
1218 PhysicalValidity::Valid => physically_valid += 1,
1219 PhysicalValidity::Suspect => physically_suspect += 1,
1220 PhysicalValidity::Invalid => physically_invalid += 1,
1221 }
1222 }
1223
1224 let temp_quality = record.assess_temperature_quality();
1226 let precip_quality = record.assess_precipitation_quality();
1227 let wind_quality = record.assess_wind_quality();
1228
1229 for quality in [temp_quality, precip_quality, wind_quality] {
1230 match quality {
1231 DataQuality::Valid => combined_valid += 1,
1232 DataQuality::SuspectOriginal => combined_suspect_original += 1,
1233 DataQuality::SuspectRange => combined_suspect_range += 1,
1234 DataQuality::SuspectBoth => combined_suspect_both += 1,
1235 DataQuality::Invalid => combined_invalid += 1,
1236 DataQuality::Missing => combined_missing += 1,
1237 }
1238 }
1239
1240 all_records.push(record);
1242 }
1243 }
1244
1245 let sample_records = self.create_diverse_sample(&all_records, sample_size);
1247
1248 temp_dates.sort();
1250 precip_dates.sort();
1251 wind_dates.sort();
1252
1253 let temperature_range = if !temp_dates.is_empty() {
1254 Some((temp_dates[0], temp_dates[temp_dates.len() - 1]))
1255 } else {
1256 None
1257 };
1258
1259 let precipitation_range = if !precip_dates.is_empty() {
1260 Some((precip_dates[0], precip_dates[precip_dates.len() - 1]))
1261 } else {
1262 None
1263 };
1264
1265 let wind_range = if !wind_dates.is_empty() {
1266 Some((wind_dates[0], wind_dates[wind_dates.len() - 1]))
1267 } else {
1268 None
1269 };
1270
1271 let countries = if min_lon < -5.0 && max_lat > 53.0 {
1273 vec!["GB".to_string(), "IE".to_string()]
1274 } else if max_lat > 55.0 {
1275 vec!["GB".to_string()]
1276 } else {
1277 vec!["IE".to_string()]
1278 };
1279
1280 Ok(WeatherDatasetSummary {
1281 total_records,
1282 total_stations: stations.len(),
1283 geographic_bounds: GeographicBounds {
1284 min_lat,
1285 max_lat,
1286 min_lon,
1287 max_lon,
1288 countries,
1289 },
1290 temporal_coverage: TemporalCoverage {
1291 overall_start: min_date
1292 .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap()),
1293 overall_end: max_date
1294 .unwrap_or_else(|| chrono::NaiveDate::from_ymd_opt(2000, 1, 1).unwrap()),
1295 temperature_range,
1296 precipitation_range,
1297 wind_range,
1298 },
1299 metric_statistics: MetricStatistics {
1300 temperature_records: temp_records,
1301 temperature_stations: temp_stations.len(),
1302 precipitation_records: precip_records,
1303 precipitation_stations: precip_stations.len(),
1304 wind_records,
1305 wind_stations: wind_stations.len(),
1306 temperature_range: if min_temp_val != f32::MAX && max_temp_val != f32::MIN {
1307 Some((min_temp_val, max_temp_val))
1308 } else {
1309 None
1310 },
1311 precipitation_range: if max_precip_val != f32::MIN {
1312 Some((0.0, max_precip_val))
1313 } else {
1314 None
1315 },
1316 wind_range: if max_wind_val != f32::MIN {
1317 Some((0.0, max_wind_val))
1318 } else {
1319 None
1320 },
1321 },
1322 data_quality: EnhancedDataQuality {
1323 ecad_valid,
1324 ecad_suspect,
1325 ecad_missing,
1326 physically_valid,
1327 physically_suspect,
1328 physically_invalid,
1329 combined_valid,
1330 combined_suspect_original,
1331 combined_suspect_range,
1332 combined_suspect_both,
1333 combined_invalid,
1334 combined_missing,
1335 validation_errors: 0, },
1337 sample_records,
1338 extreme_records: ExtremeRecords {
1339 coldest: coldest_record,
1340 hottest: hottest_record,
1341 wettest: wettest_record,
1342 windiest: windiest_record,
1343 },
1344 })
1345 }
1346
1347 fn create_diverse_sample(
1349 &self,
1350 all_records: &[WeatherRecord],
1351 sample_size: usize,
1352 ) -> Vec<WeatherRecord> {
1353 use std::collections::HashMap;
1354
1355 if all_records.is_empty() || sample_size == 0 {
1356 return Vec::new();
1357 }
1358
1359 let mut samples = Vec::new();
1360 let mut station_counts: HashMap<String, usize> = HashMap::new();
1361
1362 let total_records = all_records.len();
1364 let step = if total_records > sample_size * 100 {
1365 total_records / (sample_size * 50) } else {
1367 std::cmp::max(1, total_records / sample_size)
1368 };
1369
1370 for (i, record) in all_records.iter().enumerate() {
1371 if i % step == 0 && samples.len() < sample_size {
1372 let station_count = station_counts
1374 .entry(record.station_name.clone())
1375 .or_insert(0);
1376 if *station_count < 2 {
1377 samples.push(record.clone());
1379 *station_count += 1;
1380 }
1381 }
1382 }
1383
1384 if samples.len() < sample_size {
1386 for record in all_records.iter().step_by(step * 2) {
1387 if samples.len() >= sample_size {
1388 break;
1389 }
1390 if !samples
1391 .iter()
1392 .any(|s| s.station_name == record.station_name && s.date == record.date)
1393 {
1394 samples.push(record.clone());
1395 }
1396 }
1397 }
1398
1399 samples.truncate(sample_size);
1400 samples
1401 }
1402}
1403
1404#[derive(Debug, PartialEq)]
1405pub enum SchemaType {
1406 ConsolidatedRecord,
1407 WeatherRecord,
1408 Unknown,
1409}
1410
1411#[derive(Debug, Clone)]
1412pub struct WeatherDatasetSummary {
1413 pub total_records: usize,
1414 pub total_stations: usize,
1415 pub geographic_bounds: GeographicBounds,
1416 pub temporal_coverage: TemporalCoverage,
1417 pub metric_statistics: MetricStatistics,
1418 pub data_quality: EnhancedDataQuality,
1419 pub sample_records: Vec<WeatherRecord>,
1420 pub extreme_records: ExtremeRecords,
1421}
1422
1423#[derive(Debug, Clone)]
1424pub struct GeographicBounds {
1425 pub min_lat: f64,
1426 pub max_lat: f64,
1427 pub min_lon: f64,
1428 pub max_lon: f64,
1429 pub countries: Vec<String>,
1430}
1431
1432#[derive(Debug, Clone)]
1433pub struct TemporalCoverage {
1434 pub overall_start: chrono::NaiveDate,
1435 pub overall_end: chrono::NaiveDate,
1436 pub temperature_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1437 pub precipitation_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1438 pub wind_range: Option<(chrono::NaiveDate, chrono::NaiveDate)>,
1439}
1440
1441#[derive(Debug, Clone)]
1442pub struct MetricStatistics {
1443 pub temperature_records: usize,
1444 pub temperature_stations: usize,
1445 pub precipitation_records: usize,
1446 pub precipitation_stations: usize,
1447 pub wind_records: usize,
1448 pub wind_stations: usize,
1449 pub temperature_range: Option<(f32, f32)>,
1450 pub precipitation_range: Option<(f32, f32)>,
1451 pub wind_range: Option<(f32, f32)>,
1452}
1453
1454#[derive(Debug, Clone)]
1455pub struct EnhancedDataQuality {
1456 pub ecad_valid: usize,
1458 pub ecad_suspect: usize,
1459 pub ecad_missing: usize,
1460
1461 pub physically_valid: usize,
1463 pub physically_suspect: usize,
1464 pub physically_invalid: usize,
1465
1466 pub combined_valid: usize,
1468 pub combined_suspect_original: usize,
1469 pub combined_suspect_range: usize,
1470 pub combined_suspect_both: usize,
1471 pub combined_invalid: usize,
1472 pub combined_missing: usize,
1473
1474 pub validation_errors: usize,
1475}
1476
1477#[derive(Debug, Clone)]
1478pub struct ExtremeRecords {
1479 pub coldest: Option<WeatherRecord>,
1480 pub hottest: Option<WeatherRecord>,
1481 pub wettest: Option<WeatherRecord>,
1482 pub windiest: Option<WeatherRecord>,
1483}
1484
1485impl WeatherDatasetSummary {
1486 pub fn display_comprehensive_summary(&self) -> String {
1487 let mut summary = String::new();
1488
1489 summary.push_str("UNIFIED WEATHER DATASET ANALYSIS\n");
1491 summary.push_str("================================\n\n");
1492
1493 summary.push_str(&format!(
1495 "Dataset Overview:\n\
1496 - Records: {} unified weather records\n\
1497 - Stations: {} across {} ({:.1}°N-{:.1}°N, {:.1}°W-{:.1}°E)\n\
1498 - Timespan: {} to {} ({} years)\n\n",
1499 self.total_records,
1500 self.total_stations,
1501 self.geographic_bounds.countries.join("/"),
1502 self.geographic_bounds.min_lat,
1503 self.geographic_bounds.max_lat,
1504 if self.geographic_bounds.min_lon < 0.0 {
1505 -self.geographic_bounds.min_lon
1506 } else {
1507 self.geographic_bounds.min_lon
1508 },
1509 self.geographic_bounds.max_lon,
1510 self.temporal_coverage.overall_start,
1511 self.temporal_coverage.overall_end,
1512 self.temporal_coverage.overall_end.year() - self.temporal_coverage.overall_start.year()
1513 ));
1514
1515 summary.push_str("Metric Coverage:\n");
1517 summary.push_str(
1518 "┌─────────────────┬──────────┬─────────────┬──────────────┬─────────────┐\n",
1519 );
1520 summary.push_str(
1521 "│ Metric │ Stations │ Records │ Coverage │ Date Range │\n",
1522 );
1523 summary.push_str(
1524 "├─────────────────┼──────────┼─────────────┼──────────────┼─────────────┤\n",
1525 );
1526
1527 let temp_coverage = if self.total_records > 0 {
1529 (self.metric_statistics.temperature_records as f32 / self.total_records as f32) * 100.0
1530 } else {
1531 0.0
1532 };
1533
1534 let temp_range = if let Some((start, end)) = self.temporal_coverage.temperature_range {
1535 format!("{}-{}", start.year(), end.year())
1536 } else {
1537 "N/A".to_string()
1538 };
1539
1540 summary.push_str(&format!(
1541 "│ Temperature │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1542 self.metric_statistics.temperature_stations,
1543 self.metric_statistics.temperature_records,
1544 temp_coverage,
1545 temp_range
1546 ));
1547
1548 let precip_coverage = if self.total_records > 0 {
1550 (self.metric_statistics.precipitation_records as f32 / self.total_records as f32)
1551 * 100.0
1552 } else {
1553 0.0
1554 };
1555
1556 let precip_range = if let Some((start, end)) = self.temporal_coverage.precipitation_range {
1557 format!("{}-{}", start.year(), end.year())
1558 } else {
1559 "N/A".to_string()
1560 };
1561
1562 summary.push_str(&format!(
1563 "│ Precipitation │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1564 self.metric_statistics.precipitation_stations,
1565 self.metric_statistics.precipitation_records,
1566 precip_coverage,
1567 precip_range
1568 ));
1569
1570 let wind_coverage = if self.total_records > 0 {
1572 (self.metric_statistics.wind_records as f32 / self.total_records as f32) * 100.0
1573 } else {
1574 0.0
1575 };
1576
1577 let wind_range = if let Some((start, end)) = self.temporal_coverage.wind_range {
1578 format!("{}-{}", start.year(), end.year())
1579 } else {
1580 "N/A".to_string()
1581 };
1582
1583 summary.push_str(&format!(
1584 "│ Wind Speed │ {:8} │ {:11} │ {:10.1}% │ {:11} │\n",
1585 self.metric_statistics.wind_stations,
1586 self.metric_statistics.wind_records,
1587 wind_coverage,
1588 wind_range
1589 ));
1590
1591 summary.push_str(
1592 "└─────────────────┴──────────┴─────────────┴──────────────┴─────────────┘\n\n",
1593 );
1594
1595 if !self.sample_records.is_empty() {
1597 summary.push_str("Sample Records (diverse stations & metrics):\n");
1598 for (i, record) in self.sample_records.iter().enumerate() {
1599 let mut metrics_display = Vec::new();
1600
1601 let temp_parts: Vec<String> = [
1603 record.temp_min.map(|t| format!("min={:.1}°C", t)),
1604 record.temp_avg.map(|t| format!("avg={:.1}°C", t)),
1605 record.temp_max.map(|t| format!("max={:.1}°C", t)),
1606 ]
1607 .into_iter()
1608 .flatten()
1609 .collect();
1610
1611 if !temp_parts.is_empty() {
1612 metrics_display.push(format!("temp({})", temp_parts.join(", ")));
1613 }
1614
1615 if let Some(precip) = record.precipitation {
1616 metrics_display.push(format!("precip={:.1}mm", precip));
1617 }
1618
1619 if let Some(wind) = record.wind_speed {
1620 metrics_display.push(format!("wind={:.1}m/s", wind));
1621 }
1622
1623 let metrics_str = if metrics_display.is_empty() {
1624 "no data".to_string()
1625 } else {
1626 metrics_display.join(", ")
1627 };
1628
1629 summary.push_str(&format!(
1630 "{}. {} on {}: {}\n",
1631 i + 1,
1632 record.station_name,
1633 record.date,
1634 metrics_str
1635 ));
1636 }
1637 summary.push('\n');
1638 }
1639
1640 summary.push_str("Extreme Records:\n");
1642 if let Some(ref coldest) = self.extreme_records.coldest {
1643 if let Some(min_temp) = coldest.temp_min {
1644 summary.push_str(&format!(
1645 "- Coldest: {:.1}°C at {} ({})\n",
1646 min_temp, coldest.station_name, coldest.date
1647 ));
1648 }
1649 }
1650
1651 if let Some(ref hottest) = self.extreme_records.hottest {
1652 if let Some(max_temp) = hottest.temp_max {
1653 summary.push_str(&format!(
1654 "- Hottest: {:.1}°C at {} ({})\n",
1655 max_temp, hottest.station_name, hottest.date
1656 ));
1657 }
1658 }
1659
1660 if let Some(ref wettest) = self.extreme_records.wettest {
1661 if let Some(precip) = wettest.precipitation {
1662 summary.push_str(&format!(
1663 "- Wettest: {:.1}mm at {} ({})\n",
1664 precip, wettest.station_name, wettest.date
1665 ));
1666 }
1667 }
1668
1669 if let Some(ref windiest) = self.extreme_records.windiest {
1670 if let Some(wind) = windiest.wind_speed {
1671 summary.push_str(&format!(
1672 "- Windiest: {:.1}m/s at {} ({})\n",
1673 wind, windiest.station_name, windiest.date
1674 ));
1675 }
1676 }
1677 summary.push('\n');
1678
1679 summary.push_str("Data Quality Analysis:\n");
1681
1682 let total_ecad = self.data_quality.ecad_valid
1683 + self.data_quality.ecad_suspect
1684 + self.data_quality.ecad_missing;
1685 if total_ecad > 0 {
1686 summary.push_str("├─ ECAD Assessment:\n");
1687 summary.push_str(&format!(
1688 "│ ├─ Valid (flag=0): {} ({:.1}%)\n",
1689 self.data_quality.ecad_valid,
1690 (self.data_quality.ecad_valid as f32 / total_ecad as f32) * 100.0
1691 ));
1692 summary.push_str(&format!(
1693 "│ ├─ Suspect (flag=1): {} ({:.1}%)\n",
1694 self.data_quality.ecad_suspect,
1695 (self.data_quality.ecad_suspect as f32 / total_ecad as f32) * 100.0
1696 ));
1697 summary.push_str(&format!(
1698 "│ └─ Missing (flag=9): {} ({:.1}%)\n",
1699 self.data_quality.ecad_missing,
1700 (self.data_quality.ecad_missing as f32 / total_ecad as f32) * 100.0
1701 ));
1702 summary.push_str("│\n");
1703 }
1704
1705 let total_physical = self.data_quality.physically_valid
1706 + self.data_quality.physically_suspect
1707 + self.data_quality.physically_invalid;
1708 if total_physical > 0 {
1709 summary.push_str("├─ Physical Validation:\n");
1710 summary.push_str(&format!(
1711 "│ ├─ Valid: {} ({:.1}%)\n",
1712 self.data_quality.physically_valid,
1713 (self.data_quality.physically_valid as f32 / total_physical as f32) * 100.0
1714 ));
1715 summary.push_str(&format!(
1716 "│ ├─ Suspect: {} ({:.1}%)\n",
1717 self.data_quality.physically_suspect,
1718 (self.data_quality.physically_suspect as f32 / total_physical as f32) * 100.0
1719 ));
1720 summary.push_str(&format!(
1721 "│ └─ Invalid: {} ({:.3}%)\n",
1722 self.data_quality.physically_invalid,
1723 (self.data_quality.physically_invalid as f32 / total_physical as f32) * 100.0
1724 ));
1725 summary.push_str("│\n");
1726 }
1727
1728 let total_combined = self.data_quality.combined_valid
1729 + self.data_quality.combined_suspect_original
1730 + self.data_quality.combined_suspect_range
1731 + self.data_quality.combined_suspect_both
1732 + self.data_quality.combined_invalid
1733 + self.data_quality.combined_missing;
1734 if total_combined > 0 {
1735 summary.push_str("└─ Combined Quality:\n");
1736 summary.push_str(&format!(
1737 " ├─ Valid: {} ({:.1}%)\n",
1738 self.data_quality.combined_valid,
1739 (self.data_quality.combined_valid as f32 / total_combined as f32) * 100.0
1740 ));
1741 summary.push_str(&format!(
1742 " ├─ Suspect (original): {} ({:.1}%)\n",
1743 self.data_quality.combined_suspect_original,
1744 (self.data_quality.combined_suspect_original as f32 / total_combined as f32)
1745 * 100.0
1746 ));
1747 summary.push_str(&format!(
1748 " ├─ Suspect (range): {} ({:.2}%)\n",
1749 self.data_quality.combined_suspect_range,
1750 (self.data_quality.combined_suspect_range as f32 / total_combined as f32) * 100.0
1751 ));
1752 if self.data_quality.combined_invalid > 0 {
1753 summary.push_str(&format!(
1754 " ├─ Invalid: {} ({:.3}%)\n",
1755 self.data_quality.combined_invalid,
1756 (self.data_quality.combined_invalid as f32 / total_combined as f32) * 100.0
1757 ));
1758 }
1759 summary.push_str(&format!(
1760 " └─ Missing: {} ({:.1}%)\n",
1761 self.data_quality.combined_missing,
1762 (self.data_quality.combined_missing as f32 / total_combined as f32) * 100.0
1763 ));
1764 }
1765
1766 if self.data_quality.physically_invalid > 0 {
1767 summary.push_str(&format!(
1768 "\n⚠️ Found {} physically impossible values that were excluded from extreme records analysis\n",
1769 self.data_quality.physically_invalid
1770 ));
1771 }
1772
1773 summary
1774 }
1775}
1776
1777impl Default for ParquetWriter {
1778 fn default() -> Self {
1779 Self::new()
1780 }
1781}
1782
1783#[derive(Debug)]
1784pub struct ParquetFileInfo {
1785 pub total_rows: i64,
1786 pub row_groups: i32,
1787 pub row_group_sizes: Vec<i64>,
1788 pub file_size: u64,
1789 pub compression: Compression,
1790}
1791
1792impl ParquetFileInfo {
1793 pub fn summary(&self) -> String {
1794 format!(
1795 "Parquet File Summary:\n\
1796 - Total rows: {}\n\
1797 - Row groups: {}\n\
1798 - File size: {:.2} MB\n\
1799 - Compression: {:?}\n\
1800 - Avg rows per group: {:.0}",
1801 self.total_rows,
1802 self.row_groups,
1803 self.file_size as f64 / 1_048_576.0, self.compression,
1805 self.total_rows as f64 / self.row_groups as f64
1806 )
1807 }
1808}
1809
1810#[cfg(test)]
1811mod tests {
1812 use super::*;
1813 use crate::models::ConsolidatedRecord;
1814 use chrono::NaiveDate;
1815 use tempfile::NamedTempFile;
1816
1817 #[test]
1818 fn test_write_empty_records() {
1819 let writer = ParquetWriter::new();
1820 let temp_file = NamedTempFile::new().unwrap();
1821
1822 let result = writer.write_records(&[], temp_file.path());
1823 assert!(result.is_ok());
1824 }
1825
1826 #[test]
1827 fn test_write_single_record() -> Result<()> {
1828 let writer = ParquetWriter::new();
1829 let temp_file = NamedTempFile::new().unwrap();
1830
1831 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1832 let record = ConsolidatedRecord::new(
1833 12345,
1834 "Test Station".to_string(),
1835 date,
1836 51.5074,
1837 -0.1278,
1838 15.0,
1839 25.0,
1840 20.0,
1841 "000".to_string(),
1842 );
1843
1844 writer.write_records(&[record], temp_file.path())?;
1845
1846 let metadata = std::fs::metadata(temp_file.path())?;
1848 assert!(metadata.len() > 0);
1849
1850 Ok(())
1851 }
1852
1853 #[test]
1854 fn test_different_compressions() -> Result<()> {
1855 let compressions = ["snappy", "gzip", "lz4", "zstd", "none"];
1856
1857 for compression in &compressions {
1858 let writer = ParquetWriter::new().with_compression(compression)?;
1859 let temp_file = NamedTempFile::new().unwrap();
1860
1861 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
1862 let record = ConsolidatedRecord::new(
1863 12345,
1864 "Test Station".to_string(),
1865 date,
1866 51.5074,
1867 -0.1278,
1868 15.0,
1869 25.0,
1870 20.0,
1871 "000".to_string(),
1872 );
1873
1874 let result = writer.write_records(&[record], temp_file.path());
1875 assert!(result.is_ok(), "Failed with compression: {}", compression);
1876 }
1877
1878 Ok(())
1879 }
1880}