use crate::error::Result;
use crate::models::{ConsolidatedRecord, StationMetadata, TemperatureRecord, TemperatureSet};
use crate::readers::TemperatureData;
use chrono::NaiveDate;
use std::collections::HashMap;
pub struct DataMerger {
allow_incomplete: bool,
}
impl DataMerger {
pub fn new() -> Self {
Self {
allow_incomplete: false,
}
}
pub fn with_allow_incomplete(allow_incomplete: bool) -> Self {
Self { allow_incomplete }
}
pub fn merge_temperature_data(
&self,
temperature_data: &TemperatureData,
) -> Result<Vec<ConsolidatedRecord>> {
let mut consolidated_records = Vec::new();
let grouped_data = self.group_by_station_and_date(temperature_data)?;
for ((station_id, date), temp_set) in grouped_data {
if let Some(station) = temperature_data.stations.get(&station_id) {
if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
consolidated_records.push(record);
}
}
}
consolidated_records.sort_by(|a, b| {
a.station_id
.cmp(&b.station_id)
.then_with(|| a.date.cmp(&b.date))
});
Ok(consolidated_records)
}
fn group_by_station_and_date(
&self,
temperature_data: &TemperatureData,
) -> Result<HashMap<(u32, NaiveDate), TemperatureSet>> {
let mut grouped = HashMap::new();
for ((station_id, date), record) in &temperature_data.min_temperatures {
let entry = grouped
.entry((*station_id, *date))
.or_insert(TemperatureSet::default());
entry.min = Some(record.clone());
}
for ((station_id, date), record) in &temperature_data.max_temperatures {
let entry = grouped
.entry((*station_id, *date))
.or_insert(TemperatureSet::default());
entry.max = Some(record.clone());
}
for ((station_id, date), record) in &temperature_data.avg_temperatures {
let entry = grouped
.entry((*station_id, *date))
.or_insert(TemperatureSet::default());
entry.avg = Some(record.clone());
}
Ok(grouped)
}
fn create_consolidated_record(
&self,
station: &StationMetadata,
date: NaiveDate,
temp_set: TemperatureSet,
) -> Result<Option<ConsolidatedRecord>> {
if !self.allow_incomplete
&& (temp_set.min.is_none() || temp_set.max.is_none() || temp_set.avg.is_none())
{
return Ok(None);
}
let (min_temp, min_quality) = temp_set
.min
.as_ref()
.map(|r| (r.temperature, r.quality_flag))
.unwrap_or((-9999.0, 9));
let (max_temp, max_quality) = temp_set
.max
.as_ref()
.map(|r| (r.temperature, r.quality_flag))
.unwrap_or((-9999.0, 9));
let (avg_temp, avg_quality) = temp_set
.avg
.as_ref()
.map(|r| (r.temperature, r.quality_flag))
.unwrap_or((-9999.0, 9));
if min_temp == -9999.0 && max_temp == -9999.0 && avg_temp == -9999.0 {
return Ok(None);
}
let quality_flags = format!("{}{}{}", min_quality, avg_quality, max_quality);
let record = ConsolidatedRecord::new(
station.staid,
station.name.clone(),
date,
station.latitude,
station.longitude,
min_temp,
max_temp,
avg_temp,
quality_flags,
);
Ok(Some(record))
}
pub fn merge_station_data(
&self,
station: &StationMetadata,
min_temps: Vec<TemperatureRecord>,
max_temps: Vec<TemperatureRecord>,
avg_temps: Vec<TemperatureRecord>,
) -> Result<Vec<ConsolidatedRecord>> {
let mut temp_map: HashMap<NaiveDate, TemperatureSet> = HashMap::new();
for record in min_temps {
let date = record.date;
temp_map.entry(date).or_default().min = Some(record);
}
for record in max_temps {
let date = record.date;
temp_map.entry(date).or_default().max = Some(record);
}
for record in avg_temps {
let date = record.date;
temp_map.entry(date).or_default().avg = Some(record);
}
let mut records = Vec::new();
for (date, temp_set) in temp_map {
if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
records.push(record);
}
}
records.sort_by_key(|r| r.date);
Ok(records)
}
}
impl Default for DataMerger {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_complete_data() {
let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
let station = StationMetadata::new(
12345,
"Test Station".to_string(),
"GB".to_string(),
51.5074,
-0.1278,
Some(35),
);
let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
let avg_temp = TemperatureRecord::new(12345, 101, date, 20.0, 0).unwrap();
let merger = DataMerger::new();
let records = merger
.merge_station_data(&station, vec![min_temp], vec![max_temp], vec![avg_temp])
.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].min_temp, 15.0);
assert_eq!(records[0].max_temp, 25.0);
assert_eq!(records[0].avg_temp, 20.0);
assert_eq!(records[0].quality_flags, "000");
}
#[test]
fn test_merge_incomplete_data() {
let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
let station = StationMetadata::new(
12345,
"Test Station".to_string(),
"GB".to_string(),
51.5074,
-0.1278,
Some(35),
);
let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
let merger = DataMerger::new();
let records = merger
.merge_station_data(
&station,
vec![min_temp.clone()],
vec![max_temp.clone()],
vec![],
)
.unwrap();
assert_eq!(records.len(), 0);
let merger = DataMerger::with_allow_incomplete(true);
let records = merger
.merge_station_data(&station, vec![min_temp], vec![max_temp], vec![])
.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].quality_flags, "090"); }
}