ecad_processor/readers/
concurrent_reader.rs

1use crate::error::{ProcessingError, Result};
2use crate::models::{StationMetadata, TemperatureRecord};
3use crate::readers::{StationReader, TemperatureReader};
4use crate::utils::constants::{STATIONS_FILE, UK_TEMP_AVG_DIR, UK_TEMP_MAX_DIR, UK_TEMP_MIN_DIR};
5use rayon::prelude::*;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10
11pub struct ConcurrentReader {
12    max_workers: usize,
13}
14
15impl ConcurrentReader {
16    pub fn new(max_workers: usize) -> Self {
17        Self { max_workers }
18    }
19
20    /// Read all temperature data concurrently
21    pub async fn read_all_temperature_data(&self, base_path: &Path) -> Result<TemperatureData> {
22        // Read station metadata first
23        let stations_path = base_path.join(UK_TEMP_MIN_DIR).join(STATIONS_FILE);
24        let station_reader = StationReader::new();
25        let stations = station_reader.read_stations_map(&stations_path)?;
26        let stations = Arc::new(stations);
27
28        // Define paths for each temperature type
29        let min_path = base_path.join(UK_TEMP_MIN_DIR);
30        let max_path = base_path.join(UK_TEMP_MAX_DIR);
31        let avg_path = base_path.join(UK_TEMP_AVG_DIR);
32
33        // Read temperature files concurrently
34        let stations_clone1 = stations.clone();
35        let stations_clone2 = stations.clone();
36        let stations_clone3 = stations.clone();
37
38        let max_workers = self.max_workers;
39
40        let min_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
41            tokio::spawn(async move {
42                Self::read_temperature_type_parallel_static(&min_path, stations_clone1, max_workers)
43                    .await
44            });
45
46        let max_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
47            tokio::spawn(async move {
48                Self::read_temperature_type_parallel_static(&max_path, stations_clone2, max_workers)
49                    .await
50            });
51
52        let avg_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
53            tokio::spawn(async move {
54                Self::read_temperature_type_parallel_static(&avg_path, stations_clone3, max_workers)
55                    .await
56            });
57
58        // Wait for all reads to complete
59        let (min_temps, max_temps, avg_temps) =
60            tokio::try_join!(min_handle, max_handle, avg_handle)?;
61
62        Ok(TemperatureData {
63            stations: Arc::try_unwrap(stations).unwrap_or_else(|arc| (*arc).clone()),
64            min_temperatures: min_temps?,
65            max_temperatures: max_temps?,
66            avg_temperatures: avg_temps?,
67        })
68    }
69
70    /// Read temperature files for a specific type using parallel processing
71    async fn read_temperature_type_parallel_static(
72        dir_path: &Path,
73        stations: Arc<HashMap<u32, StationMetadata>>,
74        _max_workers: usize,
75    ) -> Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>> {
76        // Find all temperature files for UK stations
77        // Determine file prefix based on directory name
78        let file_prefix = match dir_path.file_name().and_then(|f| f.to_str()) {
79            Some("uk_temp_min") => "TN",
80            Some("uk_temp_max") => "TX",
81            Some("uk_temp_avg") => "TG",
82            _ => {
83                return Err(ProcessingError::InvalidFormat(format!(
84                    "Unknown temperature directory: {:?}",
85                    dir_path
86                )))
87            }
88        };
89
90        let temperature_files: Vec<PathBuf> =
91            Self::find_temperature_files_static(dir_path, &stations, file_prefix)?;
92
93        // Process files in parallel using Rayon
94        let all_records: Vec<Vec<TemperatureRecord>> = temperature_files
95            .par_iter()
96            .map(|path| {
97                let reader = TemperatureReader::new();
98                reader.read_temperatures(path)
99            })
100            .collect::<Result<Vec<_>>>()?;
101
102        // Flatten and convert to HashMap
103        let mut temperature_map = HashMap::new();
104        for records in all_records {
105            for record in records {
106                temperature_map.insert((record.staid, record.date), record);
107            }
108        }
109
110        Ok(temperature_map)
111    }
112
113    /// Find temperature files for UK stations with specific prefix
114    fn find_temperature_files_static(
115        dir_path: &Path,
116        stations: &HashMap<u32, StationMetadata>,
117        file_prefix: &str,
118    ) -> Result<Vec<PathBuf>> {
119        let mut files = Vec::new();
120
121        // Look for files with pattern {prefix}_STAID*.txt
122        for entry in std::fs::read_dir(dir_path)? {
123            let entry = entry?;
124            let path = entry.path();
125
126            if path.is_file() {
127                if let Some(file_name) = path.file_name() {
128                    let file_name_str = file_name.to_string_lossy();
129
130                    // Check if it's a temperature data file with the right prefix
131                    if file_name_str.starts_with(&format!("{}_STAID", file_prefix))
132                        && file_name_str.ends_with(".txt")
133                    {
134                        // Extract station ID from filename
135                        if let Some(staid_str) = file_name_str
136                            .strip_prefix(&format!("{}_STAID", file_prefix))
137                            .and_then(|s| s.strip_suffix(".txt"))
138                        {
139                            if let Ok(staid) = staid_str.trim_start_matches('0').parse::<u32>() {
140                                // Only include UK stations
141                                if let Some(station) = stations.get(&staid) {
142                                    if station.is_uk_station() {
143                                        files.push(path);
144                                    }
145                                }
146                            }
147                        }
148                    }
149                }
150            }
151        }
152
153        Ok(files)
154    }
155
156    /// Process a single station's data across all temperature types
157    pub fn process_station_data(
158        &self,
159        station_id: u32,
160        base_path: &Path,
161    ) -> Result<StationTemperatureData> {
162        let min_path = base_path.join(UK_TEMP_MIN_DIR);
163        let max_path = base_path.join(UK_TEMP_MAX_DIR);
164        let avg_path = base_path.join(UK_TEMP_AVG_DIR);
165
166        let reader = TemperatureReader::new();
167
168        // Read temperature data for this station
169        let min_file = min_path.join(format!("TG_STAID{:06}.txt", station_id));
170        let max_file = max_path.join(format!("TG_STAID{:06}.txt", station_id));
171        let avg_file = avg_path.join(format!("TG_STAID{:06}.txt", station_id));
172
173        let min_temps = if min_file.exists() {
174            reader.read_temperatures(&min_file)?
175        } else {
176            Vec::new()
177        };
178
179        let max_temps = if max_file.exists() {
180            reader.read_temperatures(&max_file)?
181        } else {
182            Vec::new()
183        };
184
185        let avg_temps = if avg_file.exists() {
186            reader.read_temperatures(&avg_file)?
187        } else {
188            Vec::new()
189        };
190
191        Ok(StationTemperatureData {
192            station_id,
193            min_temperatures: min_temps,
194            max_temperatures: max_temps,
195            avg_temperatures: avg_temps,
196        })
197    }
198}
199
200impl Default for ConcurrentReader {
201    fn default() -> Self {
202        Self::new(num_cpus::get())
203    }
204}
205
206/// Container for all temperature data
207#[derive(Debug)]
208pub struct TemperatureData {
209    pub stations: HashMap<u32, StationMetadata>,
210    pub min_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
211    pub max_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
212    pub avg_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
213}
214
215/// Container for a single station's temperature data
216#[derive(Debug)]
217pub struct StationTemperatureData {
218    pub station_id: u32,
219    pub min_temperatures: Vec<TemperatureRecord>,
220    pub max_temperatures: Vec<TemperatureRecord>,
221    pub avg_temperatures: Vec<TemperatureRecord>,
222}