ecad_processor/processors/
data_merger.rs1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, StationMetadata, TemperatureRecord, TemperatureSet};
3use crate::readers::TemperatureData;
4use chrono::NaiveDate;
5use std::collections::HashMap;
6
7pub struct DataMerger {
8 allow_incomplete: bool,
9}
10
11impl DataMerger {
12 pub fn new() -> Self {
13 Self {
14 allow_incomplete: false,
15 }
16 }
17
18 pub fn with_allow_incomplete(allow_incomplete: bool) -> Self {
19 Self { allow_incomplete }
20 }
21
22 pub fn merge_temperature_data(
24 &self,
25 temperature_data: &TemperatureData,
26 ) -> Result<Vec<ConsolidatedRecord>> {
27 let mut consolidated_records = Vec::new();
28
29 let grouped_data = self.group_by_station_and_date(temperature_data)?;
31
32 for ((station_id, date), temp_set) in grouped_data {
34 if let Some(station) = temperature_data.stations.get(&station_id) {
35 if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
36 consolidated_records.push(record);
37 }
38 }
39 }
40
41 consolidated_records.sort_by(|a, b| {
43 a.station_id
44 .cmp(&b.station_id)
45 .then_with(|| a.date.cmp(&b.date))
46 });
47
48 Ok(consolidated_records)
49 }
50
51 fn group_by_station_and_date(
53 &self,
54 temperature_data: &TemperatureData,
55 ) -> Result<HashMap<(u32, NaiveDate), TemperatureSet>> {
56 let mut grouped = HashMap::new();
57
58 for ((station_id, date), record) in &temperature_data.min_temperatures {
60 let entry = grouped
61 .entry((*station_id, *date))
62 .or_insert(TemperatureSet::default());
63 entry.min = Some(record.clone());
64 }
65
66 for ((station_id, date), record) in &temperature_data.max_temperatures {
68 let entry = grouped
69 .entry((*station_id, *date))
70 .or_insert(TemperatureSet::default());
71 entry.max = Some(record.clone());
72 }
73
74 for ((station_id, date), record) in &temperature_data.avg_temperatures {
76 let entry = grouped
77 .entry((*station_id, *date))
78 .or_insert(TemperatureSet::default());
79 entry.avg = Some(record.clone());
80 }
81
82 Ok(grouped)
83 }
84
85 fn create_consolidated_record(
87 &self,
88 station: &StationMetadata,
89 date: NaiveDate,
90 temp_set: TemperatureSet,
91 ) -> Result<Option<ConsolidatedRecord>> {
92 if !self.allow_incomplete
94 && (temp_set.min.is_none() || temp_set.max.is_none() || temp_set.avg.is_none())
95 {
96 return Ok(None);
97 }
98
99 let (min_temp, min_quality) = temp_set
104 .min
105 .as_ref()
106 .map(|r| (r.temperature, r.quality_flag))
107 .unwrap_or((-9999.0, 9));
108
109 let (max_temp, max_quality) = temp_set
110 .max
111 .as_ref()
112 .map(|r| (r.temperature, r.quality_flag))
113 .unwrap_or((-9999.0, 9));
114
115 let (avg_temp, avg_quality) = temp_set
116 .avg
117 .as_ref()
118 .map(|r| (r.temperature, r.quality_flag))
119 .unwrap_or((-9999.0, 9));
120
121 if min_temp == -9999.0 && max_temp == -9999.0 && avg_temp == -9999.0 {
123 return Ok(None);
124 }
125
126 let quality_flags = format!("{}{}{}", min_quality, avg_quality, max_quality);
128
129 let record = ConsolidatedRecord::new(
131 station.staid,
132 station.name.clone(),
133 date,
134 station.latitude,
135 station.longitude,
136 min_temp,
137 max_temp,
138 avg_temp,
139 quality_flags,
140 );
141
142 Ok(Some(record))
149 }
150
151 pub fn merge_station_data(
153 &self,
154 station: &StationMetadata,
155 min_temps: Vec<TemperatureRecord>,
156 max_temps: Vec<TemperatureRecord>,
157 avg_temps: Vec<TemperatureRecord>,
158 ) -> Result<Vec<ConsolidatedRecord>> {
159 let mut temp_map: HashMap<NaiveDate, TemperatureSet> = HashMap::new();
160
161 for record in min_temps {
163 let date = record.date;
164 temp_map.entry(date).or_default().min = Some(record);
165 }
166
167 for record in max_temps {
169 let date = record.date;
170 temp_map.entry(date).or_default().max = Some(record);
171 }
172
173 for record in avg_temps {
175 let date = record.date;
176 temp_map.entry(date).or_default().avg = Some(record);
177 }
178
179 let mut records = Vec::new();
181 for (date, temp_set) in temp_map {
182 if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
183 records.push(record);
184 }
185 }
186
187 records.sort_by_key(|r| r.date);
189
190 Ok(records)
191 }
192}
193
194impl Default for DataMerger {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn test_merge_complete_data() {
206 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
207
208 let station = StationMetadata::new(
209 12345,
210 "Test Station".to_string(),
211 "GB".to_string(),
212 51.5074,
213 -0.1278,
214 Some(35),
215 );
216
217 let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
218 let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
219 let avg_temp = TemperatureRecord::new(12345, 101, date, 20.0, 0).unwrap();
220
221 let merger = DataMerger::new();
222 let records = merger
223 .merge_station_data(&station, vec![min_temp], vec![max_temp], vec![avg_temp])
224 .unwrap();
225
226 assert_eq!(records.len(), 1);
227 assert_eq!(records[0].min_temp, 15.0);
228 assert_eq!(records[0].max_temp, 25.0);
229 assert_eq!(records[0].avg_temp, 20.0);
230 assert_eq!(records[0].quality_flags, "000");
231 }
232
233 #[test]
234 fn test_merge_incomplete_data() {
235 let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
236
237 let station = StationMetadata::new(
238 12345,
239 "Test Station".to_string(),
240 "GB".to_string(),
241 51.5074,
242 -0.1278,
243 Some(35),
244 );
245
246 let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
247 let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
248 let merger = DataMerger::new();
251 let records = merger
252 .merge_station_data(
253 &station,
254 vec![min_temp.clone()],
255 vec![max_temp.clone()],
256 vec![],
257 )
258 .unwrap();
259
260 assert_eq!(records.len(), 0);
262
263 let merger = DataMerger::with_allow_incomplete(true);
265 let records = merger
266 .merge_station_data(&station, vec![min_temp], vec![max_temp], vec![])
267 .unwrap();
268
269 assert_eq!(records.len(), 1);
270 assert_eq!(records[0].quality_flags, "090"); }
272}