ecad_processor/readers/
temperature_reader.rs

1use crate::error::{ProcessingError, Result};
2use crate::models::TemperatureRecord;
3use crate::utils::constants::DEFAULT_BUFFER_SIZE;
4use chrono::NaiveDate;
5use memmap2::Mmap;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9
10pub struct TemperatureReader {
11    skip_headers: bool,
12    use_mmap: bool,
13}
14
15impl TemperatureReader {
16    pub fn new() -> Self {
17        Self {
18            skip_headers: true,
19            use_mmap: false,
20        }
21    }
22
23    pub fn with_skip_headers(skip_headers: bool) -> Self {
24        Self {
25            skip_headers,
26            use_mmap: false,
27        }
28    }
29
30    pub fn with_mmap(use_mmap: bool) -> Self {
31        Self {
32            skip_headers: true,
33            use_mmap,
34        }
35    }
36
37    /// Read temperature records from a file (extracts station ID from filename)
38    pub fn read_temperatures(&self, path: &Path) -> Result<Vec<TemperatureRecord>> {
39        let station_id = self.extract_station_id_from_path(path)?;
40        self.read_temperatures_with_station_id(path, station_id)
41    }
42
43    /// Read temperature records from a file with explicit station ID
44    pub fn read_temperatures_with_station_id(
45        &self,
46        path: &Path,
47        station_id: u32,
48    ) -> Result<Vec<TemperatureRecord>> {
49        if self.use_mmap {
50            self.read_temperatures_mmap(path, station_id)
51        } else {
52            self.read_temperatures_buffered(path, station_id)
53        }
54    }
55
56    /// Extract station ID from filename (e.g., TN_STAID000257.txt -> 257)
57    pub fn extract_station_id_from_path(&self, path: &Path) -> Result<u32> {
58        let filename = path
59            .file_name()
60            .and_then(|f| f.to_str())
61            .ok_or_else(|| ProcessingError::InvalidFormat("Invalid file path".to_string()))?;
62
63        // Extract station ID from patterns like TN_STAID000257.txt, TX_STAID000257.txt, TG_STAID000257.txt
64        if let Some(staid_part) = filename
65            .strip_suffix(".txt")
66            .and_then(|s| s.find("STAID").map(|pos| &s[pos + 5..]))
67        {
68            staid_part
69                .trim_start_matches('0')
70                .parse::<u32>()
71                .map_err(|_| {
72                    ProcessingError::InvalidFormat(format!(
73                        "Could not extract station ID from filename: {}",
74                        filename
75                    ))
76                })
77        } else {
78            Err(ProcessingError::InvalidFormat(format!(
79                "Filename does not match expected pattern: {}",
80                filename
81            )))
82        }
83    }
84
85    /// Read temperature records using buffered I/O
86    fn read_temperatures_buffered(
87        &self,
88        path: &Path,
89        station_id: u32,
90    ) -> Result<Vec<TemperatureRecord>> {
91        let file = File::open(path)?;
92        let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file);
93        let mut records = Vec::new();
94        let mut line_count = 0;
95
96        for line_result in reader.lines() {
97            let line = line_result?;
98            line_count += 1;
99
100            // Skip empty lines
101            if line.trim().is_empty() {
102                continue;
103            }
104
105            // Skip header lines
106            if self.skip_headers && line_count <= 20 {
107                continue;
108            }
109
110            // Parse temperature data
111            if let Some(record) = self.parse_temperature_line(&line, station_id)? {
112                records.push(record);
113            }
114        }
115
116        Ok(records)
117    }
118
119    /// Read temperature records using memory-mapped I/O for large files
120    fn read_temperatures_mmap(
121        &self,
122        path: &Path,
123        station_id: u32,
124    ) -> Result<Vec<TemperatureRecord>> {
125        let file = File::open(path)?;
126        let mmap = unsafe { Mmap::map(&file)? };
127        let content = std::str::from_utf8(&mmap)
128            .map_err(|e| ProcessingError::InvalidFormat(format!("Invalid UTF-8: {}", e)))?;
129
130        let mut records = Vec::new();
131        let mut line_count = 0;
132
133        for line in content.lines() {
134            line_count += 1;
135
136            // Skip empty lines
137            if line.trim().is_empty() {
138                continue;
139            }
140
141            // Skip header lines
142            if self.skip_headers && line_count <= 20 {
143                continue;
144            }
145
146            // Parse temperature data
147            if let Some(record) = self.parse_temperature_line(line, station_id)? {
148                records.push(record);
149            }
150        }
151
152        Ok(records)
153    }
154
155    /// Parse a single line from the temperature file with provided station ID
156    fn parse_temperature_line(
157        &self,
158        line: &str,
159        station_id: u32,
160    ) -> Result<Option<TemperatureRecord>> {
161        // Expected format: SOUID, DATE, TEMP, Q_TEMP
162        let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
163
164        if parts.len() < 4 {
165            return Ok(None); // Skip malformed lines
166        }
167
168        // Parse source ID
169        let souid = parts[0].parse::<u32>().map_err(|_| {
170            ProcessingError::InvalidFormat(format!("Invalid source ID: '{}'", parts[0]))
171        })?;
172
173        // Parse date (YYYYMMDD format)
174        let date_str = parts[1];
175        let date = NaiveDate::parse_from_str(date_str, "%Y%m%d").map_err(|_| {
176            ProcessingError::InvalidFormat(format!("Invalid date format: '{}'", date_str))
177        })?;
178
179        // Parse temperature (in 0.1 degrees Celsius)
180        let temp_str = parts[2];
181        if temp_str == "-9999" {
182            return Ok(None); // Skip missing values
183        }
184
185        let temp_tenths = temp_str.parse::<i32>().map_err(|_| {
186            ProcessingError::InvalidFormat(format!("Invalid temperature: '{}'", temp_str))
187        })?;
188
189        let temperature = temp_tenths as f32 / 10.0;
190
191        // Parse quality flag
192        let quality_flag = parts[3].parse::<u8>().map_err(|_| {
193            ProcessingError::InvalidFormat(format!("Invalid quality flag: '{}'", parts[3]))
194        })?;
195
196        let record = TemperatureRecord::new(station_id, souid, date, temperature, quality_flag)?;
197
198        Ok(Some(record))
199    }
200
201    /// Read temperature records for a specific station
202    pub fn read_station_temperatures(
203        &self,
204        path: &Path,
205        station_id: u32,
206    ) -> Result<Vec<TemperatureRecord>> {
207        let all_records = self.read_temperatures(path)?;
208        Ok(all_records
209            .into_iter()
210            .filter(|r| r.staid == station_id)
211            .collect())
212    }
213
214    /// Stream temperature records using an iterator (memory efficient for large files)
215    pub fn stream_temperatures<'a>(&self, path: &'a Path) -> Result<TemperatureIterator<'a>> {
216        TemperatureIterator::new(path, self.skip_headers)
217    }
218}
219
220impl Default for TemperatureReader {
221    fn default() -> Self {
222        Self::new()
223    }
224}
225
226/// Iterator for streaming temperature records
227pub struct TemperatureIterator<'a> {
228    reader: BufReader<File>,
229    skip_headers: bool,
230    line_count: usize,
231    _path: &'a Path,
232}
233
234impl<'a> TemperatureIterator<'a> {
235    fn new(path: &'a Path, skip_headers: bool) -> Result<Self> {
236        let file = File::open(path)?;
237        let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file);
238
239        Ok(Self {
240            reader,
241            skip_headers,
242            line_count: 0,
243            _path: path,
244        })
245    }
246}
247
248impl Iterator for TemperatureIterator<'_> {
249    type Item = Result<TemperatureRecord>;
250
251    fn next(&mut self) -> Option<Self::Item> {
252        let mut line = String::new();
253
254        loop {
255            line.clear();
256
257            match self.reader.read_line(&mut line) {
258                Ok(0) => return None, // EOF
259                Ok(_) => {
260                    self.line_count += 1;
261
262                    // Skip empty lines
263                    if line.trim().is_empty() {
264                        continue;
265                    }
266
267                    // Skip headers
268                    if self.skip_headers && self.line_count <= 20 {
269                        continue;
270                    }
271
272                    // Parse line - TODO: Fix iterator to extract station ID from path
273                    let reader = TemperatureReader::new();
274                    let station_id = reader
275                        .extract_station_id_from_path(self._path)
276                        .ok()
277                        .unwrap_or(0);
278                    match reader.parse_temperature_line(&line, station_id) {
279                        Ok(Some(record)) => return Some(Ok(record)),
280                        Ok(None) => continue, // Skip invalid/missing data
281                        Err(e) => return Some(Err(e)),
282                    }
283                }
284                Err(e) => return Some(Err(e.into())),
285            }
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use std::io::Write;
294    use tempfile::NamedTempFile;
295
296    #[test]
297    fn test_parse_temperature_line() {
298        let reader = TemperatureReader::new();
299
300        // Format: SOUID, DATE, TEMP, Q_TEMP
301        let line = "  101, 19500101,  125, 0";
302        let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
303
304        assert_eq!(record.staid, 257);
305        assert_eq!(record.souid, 101);
306        assert_eq!(record.date.format("%Y-%m-%d").to_string(), "1950-01-01");
307        assert_eq!(record.temperature, 12.5);
308        assert_eq!(record.quality_flag, 0);
309    }
310
311    #[test]
312    fn test_read_temperature_file() -> Result<()> {
313        let mut temp_file = NamedTempFile::new()?;
314
315        // Write header
316        for _ in 0..20 {
317            writeln!(temp_file, "Header line")?;
318        }
319
320        // Write data (SOUID, DATE, TEMP, Q_TEMP format)
321        writeln!(temp_file, "  101, 20230101,  125, 0")?;
322        writeln!(temp_file, "  101, 20230102,  130, 0")?;
323        writeln!(temp_file, "  101, 20230103, -9999, 9")?; // Missing data
324        writeln!(temp_file, "  102, 20230101,  145, 0")?;
325
326        // Create a temporary file with proper naming convention for station ID extraction
327        let test_file = temp_file
328            .path()
329            .parent()
330            .unwrap()
331            .join("TN_STAID000257.txt");
332        std::fs::copy(temp_file.path(), &test_file)?;
333
334        let reader = TemperatureReader::new();
335        let records = reader.read_temperatures(&test_file)?;
336
337        // Clean up
338        std::fs::remove_file(&test_file)?;
339
340        assert_eq!(records.len(), 3); // Missing data excluded
341        assert_eq!(records[0].staid, 257); // Station ID from filename
342        assert_eq!(records[0].temperature, 12.5);
343        assert_eq!(records[1].temperature, 13.0);
344        assert_eq!(records[2].temperature, 14.5);
345
346        Ok(())
347    }
348
349    #[test]
350    fn test_temperature_validation() {
351        let reader = TemperatureReader::new();
352
353        // Valid temperature
354        let line = "  101, 20230101,  250, 0";
355        let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
356        assert!(record.validate().is_ok());
357
358        // Invalid temperature (out of range)
359        let line = "  101, 20230101,  600, 0"; // 60°C
360        let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
361        assert!(record.validate().is_err());
362    }
363}