1use crate::error::{ProcessingError, Result};
2use crate::models::TemperatureRecord;
3use crate::utils::constants::DEFAULT_BUFFER_SIZE;
4use chrono::NaiveDate;
5use memmap2::Mmap;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9
10pub struct TemperatureReader {
11 skip_headers: bool,
12 use_mmap: bool,
13}
14
15impl TemperatureReader {
16 pub fn new() -> Self {
17 Self {
18 skip_headers: true,
19 use_mmap: false,
20 }
21 }
22
23 pub fn with_skip_headers(skip_headers: bool) -> Self {
24 Self {
25 skip_headers,
26 use_mmap: false,
27 }
28 }
29
30 pub fn with_mmap(use_mmap: bool) -> Self {
31 Self {
32 skip_headers: true,
33 use_mmap,
34 }
35 }
36
37 pub fn read_temperatures(&self, path: &Path) -> Result<Vec<TemperatureRecord>> {
39 let station_id = self.extract_station_id_from_path(path)?;
40 self.read_temperatures_with_station_id(path, station_id)
41 }
42
43 pub fn read_temperatures_with_station_id(
45 &self,
46 path: &Path,
47 station_id: u32,
48 ) -> Result<Vec<TemperatureRecord>> {
49 if self.use_mmap {
50 self.read_temperatures_mmap(path, station_id)
51 } else {
52 self.read_temperatures_buffered(path, station_id)
53 }
54 }
55
56 pub fn extract_station_id_from_path(&self, path: &Path) -> Result<u32> {
58 let filename = path
59 .file_name()
60 .and_then(|f| f.to_str())
61 .ok_or_else(|| ProcessingError::InvalidFormat("Invalid file path".to_string()))?;
62
63 if let Some(staid_part) = filename
65 .strip_suffix(".txt")
66 .and_then(|s| s.find("STAID").map(|pos| &s[pos + 5..]))
67 {
68 staid_part
69 .trim_start_matches('0')
70 .parse::<u32>()
71 .map_err(|_| {
72 ProcessingError::InvalidFormat(format!(
73 "Could not extract station ID from filename: {}",
74 filename
75 ))
76 })
77 } else {
78 Err(ProcessingError::InvalidFormat(format!(
79 "Filename does not match expected pattern: {}",
80 filename
81 )))
82 }
83 }
84
85 fn read_temperatures_buffered(
87 &self,
88 path: &Path,
89 station_id: u32,
90 ) -> Result<Vec<TemperatureRecord>> {
91 let file = File::open(path)?;
92 let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file);
93 let mut records = Vec::new();
94 let mut line_count = 0;
95
96 for line_result in reader.lines() {
97 let line = line_result?;
98 line_count += 1;
99
100 if line.trim().is_empty() {
102 continue;
103 }
104
105 if self.skip_headers && line_count <= 20 {
107 continue;
108 }
109
110 if let Some(record) = self.parse_temperature_line(&line, station_id)? {
112 records.push(record);
113 }
114 }
115
116 Ok(records)
117 }
118
119 fn read_temperatures_mmap(
121 &self,
122 path: &Path,
123 station_id: u32,
124 ) -> Result<Vec<TemperatureRecord>> {
125 let file = File::open(path)?;
126 let mmap = unsafe { Mmap::map(&file)? };
127 let content = std::str::from_utf8(&mmap)
128 .map_err(|e| ProcessingError::InvalidFormat(format!("Invalid UTF-8: {}", e)))?;
129
130 let mut records = Vec::new();
131 let mut line_count = 0;
132
133 for line in content.lines() {
134 line_count += 1;
135
136 if line.trim().is_empty() {
138 continue;
139 }
140
141 if self.skip_headers && line_count <= 20 {
143 continue;
144 }
145
146 if let Some(record) = self.parse_temperature_line(line, station_id)? {
148 records.push(record);
149 }
150 }
151
152 Ok(records)
153 }
154
155 fn parse_temperature_line(
157 &self,
158 line: &str,
159 station_id: u32,
160 ) -> Result<Option<TemperatureRecord>> {
161 let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
163
164 if parts.len() < 4 {
165 return Ok(None); }
167
168 let souid = parts[0].parse::<u32>().map_err(|_| {
170 ProcessingError::InvalidFormat(format!("Invalid source ID: '{}'", parts[0]))
171 })?;
172
173 let date_str = parts[1];
175 let date = NaiveDate::parse_from_str(date_str, "%Y%m%d").map_err(|_| {
176 ProcessingError::InvalidFormat(format!("Invalid date format: '{}'", date_str))
177 })?;
178
179 let temp_str = parts[2];
181 if temp_str == "-9999" {
182 return Ok(None); }
184
185 let temp_tenths = temp_str.parse::<i32>().map_err(|_| {
186 ProcessingError::InvalidFormat(format!("Invalid temperature: '{}'", temp_str))
187 })?;
188
189 let temperature = temp_tenths as f32 / 10.0;
190
191 let quality_flag = parts[3].parse::<u8>().map_err(|_| {
193 ProcessingError::InvalidFormat(format!("Invalid quality flag: '{}'", parts[3]))
194 })?;
195
196 let record = TemperatureRecord::new(station_id, souid, date, temperature, quality_flag)?;
197
198 Ok(Some(record))
199 }
200
201 pub fn read_station_temperatures(
203 &self,
204 path: &Path,
205 station_id: u32,
206 ) -> Result<Vec<TemperatureRecord>> {
207 let all_records = self.read_temperatures(path)?;
208 Ok(all_records
209 .into_iter()
210 .filter(|r| r.staid == station_id)
211 .collect())
212 }
213
214 pub fn stream_temperatures<'a>(&self, path: &'a Path) -> Result<TemperatureIterator<'a>> {
216 TemperatureIterator::new(path, self.skip_headers)
217 }
218}
219
220impl Default for TemperatureReader {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226pub struct TemperatureIterator<'a> {
228 reader: BufReader<File>,
229 skip_headers: bool,
230 line_count: usize,
231 _path: &'a Path,
232}
233
234impl<'a> TemperatureIterator<'a> {
235 fn new(path: &'a Path, skip_headers: bool) -> Result<Self> {
236 let file = File::open(path)?;
237 let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file);
238
239 Ok(Self {
240 reader,
241 skip_headers,
242 line_count: 0,
243 _path: path,
244 })
245 }
246}
247
248impl Iterator for TemperatureIterator<'_> {
249 type Item = Result<TemperatureRecord>;
250
251 fn next(&mut self) -> Option<Self::Item> {
252 let mut line = String::new();
253
254 loop {
255 line.clear();
256
257 match self.reader.read_line(&mut line) {
258 Ok(0) => return None, Ok(_) => {
260 self.line_count += 1;
261
262 if line.trim().is_empty() {
264 continue;
265 }
266
267 if self.skip_headers && self.line_count <= 20 {
269 continue;
270 }
271
272 let reader = TemperatureReader::new();
274 let station_id = reader
275 .extract_station_id_from_path(self._path)
276 .ok()
277 .unwrap_or(0);
278 match reader.parse_temperature_line(&line, station_id) {
279 Ok(Some(record)) => return Some(Ok(record)),
280 Ok(None) => continue, Err(e) => return Some(Err(e)),
282 }
283 }
284 Err(e) => return Some(Err(e.into())),
285 }
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use std::io::Write;
294 use tempfile::NamedTempFile;
295
296 #[test]
297 fn test_parse_temperature_line() {
298 let reader = TemperatureReader::new();
299
300 let line = " 101, 19500101, 125, 0";
302 let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
303
304 assert_eq!(record.staid, 257);
305 assert_eq!(record.souid, 101);
306 assert_eq!(record.date.format("%Y-%m-%d").to_string(), "1950-01-01");
307 assert_eq!(record.temperature, 12.5);
308 assert_eq!(record.quality_flag, 0);
309 }
310
311 #[test]
312 fn test_read_temperature_file() -> Result<()> {
313 let mut temp_file = NamedTempFile::new()?;
314
315 for _ in 0..20 {
317 writeln!(temp_file, "Header line")?;
318 }
319
320 writeln!(temp_file, " 101, 20230101, 125, 0")?;
322 writeln!(temp_file, " 101, 20230102, 130, 0")?;
323 writeln!(temp_file, " 101, 20230103, -9999, 9")?; writeln!(temp_file, " 102, 20230101, 145, 0")?;
325
326 let test_file = temp_file
328 .path()
329 .parent()
330 .unwrap()
331 .join("TN_STAID000257.txt");
332 std::fs::copy(temp_file.path(), &test_file)?;
333
334 let reader = TemperatureReader::new();
335 let records = reader.read_temperatures(&test_file)?;
336
337 std::fs::remove_file(&test_file)?;
339
340 assert_eq!(records.len(), 3); assert_eq!(records[0].staid, 257); assert_eq!(records[0].temperature, 12.5);
343 assert_eq!(records[1].temperature, 13.0);
344 assert_eq!(records[2].temperature, 14.5);
345
346 Ok(())
347 }
348
349 #[test]
350 fn test_temperature_validation() {
351 let reader = TemperatureReader::new();
352
353 let line = " 101, 20230101, 250, 0";
355 let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
356 assert!(record.validate().is_ok());
357
358 let line = " 101, 20230101, 600, 0"; let record = reader.parse_temperature_line(line, 257).unwrap().unwrap();
361 assert!(record.validate().is_err());
362 }
363}