ecad_processor/processors/
data_merger.rs

1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, StationMetadata, TemperatureRecord, TemperatureSet};
3use crate::readers::TemperatureData;
4use chrono::NaiveDate;
5use std::collections::HashMap;
6
7pub struct DataMerger {
8    allow_incomplete: bool,
9}
10
11impl DataMerger {
12    pub fn new() -> Self {
13        Self {
14            allow_incomplete: false,
15        }
16    }
17
18    pub fn with_allow_incomplete(allow_incomplete: bool) -> Self {
19        Self { allow_incomplete }
20    }
21
22    /// Merge temperature data into consolidated records
23    pub fn merge_temperature_data(
24        &self,
25        temperature_data: &TemperatureData,
26    ) -> Result<Vec<ConsolidatedRecord>> {
27        let mut consolidated_records = Vec::new();
28
29        // Group temperature records by station and date
30        let grouped_data = self.group_by_station_and_date(temperature_data)?;
31
32        // Process each station-date combination
33        for ((station_id, date), temp_set) in grouped_data {
34            if let Some(station) = temperature_data.stations.get(&station_id) {
35                if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
36                    consolidated_records.push(record);
37                }
38            }
39        }
40
41        // Sort by station ID and date
42        consolidated_records.sort_by(|a, b| {
43            a.station_id
44                .cmp(&b.station_id)
45                .then_with(|| a.date.cmp(&b.date))
46        });
47
48        Ok(consolidated_records)
49    }
50
51    /// Group temperature records by station ID and date
52    fn group_by_station_and_date(
53        &self,
54        temperature_data: &TemperatureData,
55    ) -> Result<HashMap<(u32, NaiveDate), TemperatureSet>> {
56        let mut grouped = HashMap::new();
57
58        // Process minimum temperatures
59        for ((station_id, date), record) in &temperature_data.min_temperatures {
60            let entry = grouped
61                .entry((*station_id, *date))
62                .or_insert(TemperatureSet::default());
63            entry.min = Some(record.clone());
64        }
65
66        // Process maximum temperatures
67        for ((station_id, date), record) in &temperature_data.max_temperatures {
68            let entry = grouped
69                .entry((*station_id, *date))
70                .or_insert(TemperatureSet::default());
71            entry.max = Some(record.clone());
72        }
73
74        // Process average temperatures
75        for ((station_id, date), record) in &temperature_data.avg_temperatures {
76            let entry = grouped
77                .entry((*station_id, *date))
78                .or_insert(TemperatureSet::default());
79            entry.avg = Some(record.clone());
80        }
81
82        Ok(grouped)
83    }
84
85    /// Create a consolidated record from station metadata and temperature set
86    fn create_consolidated_record(
87        &self,
88        station: &StationMetadata,
89        date: NaiveDate,
90        temp_set: TemperatureSet,
91    ) -> Result<Option<ConsolidatedRecord>> {
92        // Check if we have all required data
93        if !self.allow_incomplete
94            && (temp_set.min.is_none() || temp_set.max.is_none() || temp_set.avg.is_none())
95        {
96            return Ok(None);
97        }
98
99        // Skip validation for now due to different measurement sources
100        // temp_set.validate_relationships()?;
101
102        // Extract temperatures with defaults for missing values
103        let (min_temp, min_quality) = temp_set
104            .min
105            .as_ref()
106            .map(|r| (r.temperature, r.quality_flag))
107            .unwrap_or((-9999.0, 9));
108
109        let (max_temp, max_quality) = temp_set
110            .max
111            .as_ref()
112            .map(|r| (r.temperature, r.quality_flag))
113            .unwrap_or((-9999.0, 9));
114
115        let (avg_temp, avg_quality) = temp_set
116            .avg
117            .as_ref()
118            .map(|r| (r.temperature, r.quality_flag))
119            .unwrap_or((-9999.0, 9));
120
121        // Skip records where all temperatures are missing
122        if min_temp == -9999.0 && max_temp == -9999.0 && avg_temp == -9999.0 {
123            return Ok(None);
124        }
125
126        // Build quality flags string (min, avg, max order)
127        let quality_flags = format!("{}{}{}", min_quality, avg_quality, max_quality);
128
129        // Create consolidated record (skip validation for missing data)
130        let record = ConsolidatedRecord::new(
131            station.staid,
132            station.name.clone(),
133            date,
134            station.latitude,
135            station.longitude,
136            min_temp,
137            max_temp,
138            avg_temp,
139            quality_flags,
140        );
141
142        // Skip relationship validation for now due to different measurement sources
143        // TODO: Implement more sophisticated validation that accounts for different SOUIDs
144        // if min_temp != -9999.0 && max_temp != -9999.0 && avg_temp != -9999.0 {
145        //     record.validate_relationships()?;
146        // }
147
148        Ok(Some(record))
149    }
150
151    /// Merge data for a specific station
152    pub fn merge_station_data(
153        &self,
154        station: &StationMetadata,
155        min_temps: Vec<TemperatureRecord>,
156        max_temps: Vec<TemperatureRecord>,
157        avg_temps: Vec<TemperatureRecord>,
158    ) -> Result<Vec<ConsolidatedRecord>> {
159        let mut temp_map: HashMap<NaiveDate, TemperatureSet> = HashMap::new();
160
161        // Add minimum temperatures
162        for record in min_temps {
163            let date = record.date;
164            temp_map.entry(date).or_default().min = Some(record);
165        }
166
167        // Add maximum temperatures
168        for record in max_temps {
169            let date = record.date;
170            temp_map.entry(date).or_default().max = Some(record);
171        }
172
173        // Add average temperatures
174        for record in avg_temps {
175            let date = record.date;
176            temp_map.entry(date).or_default().avg = Some(record);
177        }
178
179        // Convert to consolidated records
180        let mut records = Vec::new();
181        for (date, temp_set) in temp_map {
182            if let Some(record) = self.create_consolidated_record(station, date, temp_set)? {
183                records.push(record);
184            }
185        }
186
187        // Sort by date
188        records.sort_by_key(|r| r.date);
189
190        Ok(records)
191    }
192}
193
194impl Default for DataMerger {
195    fn default() -> Self {
196        Self::new()
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[test]
205    fn test_merge_complete_data() {
206        let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
207
208        let station = StationMetadata::new(
209            12345,
210            "Test Station".to_string(),
211            "GB".to_string(),
212            51.5074,
213            -0.1278,
214            Some(35),
215        );
216
217        let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
218        let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
219        let avg_temp = TemperatureRecord::new(12345, 101, date, 20.0, 0).unwrap();
220
221        let merger = DataMerger::new();
222        let records = merger
223            .merge_station_data(&station, vec![min_temp], vec![max_temp], vec![avg_temp])
224            .unwrap();
225
226        assert_eq!(records.len(), 1);
227        assert_eq!(records[0].min_temp, 15.0);
228        assert_eq!(records[0].max_temp, 25.0);
229        assert_eq!(records[0].avg_temp, 20.0);
230        assert_eq!(records[0].quality_flags, "000");
231    }
232
233    #[test]
234    fn test_merge_incomplete_data() {
235        let date = NaiveDate::from_ymd_opt(2023, 7, 15).unwrap();
236
237        let station = StationMetadata::new(
238            12345,
239            "Test Station".to_string(),
240            "GB".to_string(),
241            51.5074,
242            -0.1278,
243            Some(35),
244        );
245
246        let min_temp = TemperatureRecord::new(12345, 101, date, 15.0, 0).unwrap();
247        let max_temp = TemperatureRecord::new(12345, 101, date, 25.0, 0).unwrap();
248        // No average temperature
249
250        let merger = DataMerger::new();
251        let records = merger
252            .merge_station_data(
253                &station,
254                vec![min_temp.clone()],
255                vec![max_temp.clone()],
256                vec![],
257            )
258            .unwrap();
259
260        // Should be empty because we're missing average temperature
261        assert_eq!(records.len(), 0);
262
263        // Now with allow_incomplete
264        let merger = DataMerger::with_allow_incomplete(true);
265        let records = merger
266            .merge_station_data(&station, vec![min_temp], vec![max_temp], vec![])
267            .unwrap();
268
269        assert_eq!(records.len(), 1);
270        assert_eq!(records[0].quality_flags, "090"); // Missing average (9 in position 2)
271    }
272}