ecad_processor/readers/
concurrent_reader.rs1use crate::error::{ProcessingError, Result};
2use crate::models::{StationMetadata, TemperatureRecord};
3use crate::readers::{StationReader, TemperatureReader};
4use crate::utils::constants::{STATIONS_FILE, UK_TEMP_AVG_DIR, UK_TEMP_MAX_DIR, UK_TEMP_MIN_DIR};
5use rayon::prelude::*;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10
11pub struct ConcurrentReader {
12 max_workers: usize,
13}
14
15impl ConcurrentReader {
16 pub fn new(max_workers: usize) -> Self {
17 Self { max_workers }
18 }
19
20 pub async fn read_all_temperature_data(&self, base_path: &Path) -> Result<TemperatureData> {
22 let stations_path = base_path.join(UK_TEMP_MIN_DIR).join(STATIONS_FILE);
24 let station_reader = StationReader::new();
25 let stations = station_reader.read_stations_map(&stations_path)?;
26 let stations = Arc::new(stations);
27
28 let min_path = base_path.join(UK_TEMP_MIN_DIR);
30 let max_path = base_path.join(UK_TEMP_MAX_DIR);
31 let avg_path = base_path.join(UK_TEMP_AVG_DIR);
32
33 let stations_clone1 = stations.clone();
35 let stations_clone2 = stations.clone();
36 let stations_clone3 = stations.clone();
37
38 let max_workers = self.max_workers;
39
40 let min_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
41 tokio::spawn(async move {
42 Self::read_temperature_type_parallel_static(&min_path, stations_clone1, max_workers)
43 .await
44 });
45
46 let max_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
47 tokio::spawn(async move {
48 Self::read_temperature_type_parallel_static(&max_path, stations_clone2, max_workers)
49 .await
50 });
51
52 let avg_handle: JoinHandle<Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>>> =
53 tokio::spawn(async move {
54 Self::read_temperature_type_parallel_static(&avg_path, stations_clone3, max_workers)
55 .await
56 });
57
58 let (min_temps, max_temps, avg_temps) =
60 tokio::try_join!(min_handle, max_handle, avg_handle)?;
61
62 Ok(TemperatureData {
63 stations: Arc::try_unwrap(stations).unwrap_or_else(|arc| (*arc).clone()),
64 min_temperatures: min_temps?,
65 max_temperatures: max_temps?,
66 avg_temperatures: avg_temps?,
67 })
68 }
69
70 async fn read_temperature_type_parallel_static(
72 dir_path: &Path,
73 stations: Arc<HashMap<u32, StationMetadata>>,
74 _max_workers: usize,
75 ) -> Result<HashMap<(u32, chrono::NaiveDate), TemperatureRecord>> {
76 let file_prefix = match dir_path.file_name().and_then(|f| f.to_str()) {
79 Some("uk_temp_min") => "TN",
80 Some("uk_temp_max") => "TX",
81 Some("uk_temp_avg") => "TG",
82 _ => {
83 return Err(ProcessingError::InvalidFormat(format!(
84 "Unknown temperature directory: {:?}",
85 dir_path
86 )))
87 }
88 };
89
90 let temperature_files: Vec<PathBuf> =
91 Self::find_temperature_files_static(dir_path, &stations, file_prefix)?;
92
93 let all_records: Vec<Vec<TemperatureRecord>> = temperature_files
95 .par_iter()
96 .map(|path| {
97 let reader = TemperatureReader::new();
98 reader.read_temperatures(path)
99 })
100 .collect::<Result<Vec<_>>>()?;
101
102 let mut temperature_map = HashMap::new();
104 for records in all_records {
105 for record in records {
106 temperature_map.insert((record.staid, record.date), record);
107 }
108 }
109
110 Ok(temperature_map)
111 }
112
113 fn find_temperature_files_static(
115 dir_path: &Path,
116 stations: &HashMap<u32, StationMetadata>,
117 file_prefix: &str,
118 ) -> Result<Vec<PathBuf>> {
119 let mut files = Vec::new();
120
121 for entry in std::fs::read_dir(dir_path)? {
123 let entry = entry?;
124 let path = entry.path();
125
126 if path.is_file() {
127 if let Some(file_name) = path.file_name() {
128 let file_name_str = file_name.to_string_lossy();
129
130 if file_name_str.starts_with(&format!("{}_STAID", file_prefix))
132 && file_name_str.ends_with(".txt")
133 {
134 if let Some(staid_str) = file_name_str
136 .strip_prefix(&format!("{}_STAID", file_prefix))
137 .and_then(|s| s.strip_suffix(".txt"))
138 {
139 if let Ok(staid) = staid_str.trim_start_matches('0').parse::<u32>() {
140 if let Some(station) = stations.get(&staid) {
142 if station.is_uk_station() {
143 files.push(path);
144 }
145 }
146 }
147 }
148 }
149 }
150 }
151 }
152
153 Ok(files)
154 }
155
156 pub fn process_station_data(
158 &self,
159 station_id: u32,
160 base_path: &Path,
161 ) -> Result<StationTemperatureData> {
162 let min_path = base_path.join(UK_TEMP_MIN_DIR);
163 let max_path = base_path.join(UK_TEMP_MAX_DIR);
164 let avg_path = base_path.join(UK_TEMP_AVG_DIR);
165
166 let reader = TemperatureReader::new();
167
168 let min_file = min_path.join(format!("TG_STAID{:06}.txt", station_id));
170 let max_file = max_path.join(format!("TG_STAID{:06}.txt", station_id));
171 let avg_file = avg_path.join(format!("TG_STAID{:06}.txt", station_id));
172
173 let min_temps = if min_file.exists() {
174 reader.read_temperatures(&min_file)?
175 } else {
176 Vec::new()
177 };
178
179 let max_temps = if max_file.exists() {
180 reader.read_temperatures(&max_file)?
181 } else {
182 Vec::new()
183 };
184
185 let avg_temps = if avg_file.exists() {
186 reader.read_temperatures(&avg_file)?
187 } else {
188 Vec::new()
189 };
190
191 Ok(StationTemperatureData {
192 station_id,
193 min_temperatures: min_temps,
194 max_temperatures: max_temps,
195 avg_temperatures: avg_temps,
196 })
197 }
198}
199
200impl Default for ConcurrentReader {
201 fn default() -> Self {
202 Self::new(num_cpus::get())
203 }
204}
205
206#[derive(Debug)]
208pub struct TemperatureData {
209 pub stations: HashMap<u32, StationMetadata>,
210 pub min_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
211 pub max_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
212 pub avg_temperatures: HashMap<(u32, chrono::NaiveDate), TemperatureRecord>,
213}
214
215#[derive(Debug)]
217pub struct StationTemperatureData {
218 pub station_id: u32,
219 pub min_temperatures: Vec<TemperatureRecord>,
220 pub max_temperatures: Vec<TemperatureRecord>,
221 pub avg_temperatures: Vec<TemperatureRecord>,
222}