ecad_processor/processors/
parallel_processor.rs1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, StationMetadata};
3use crate::processors::{DataMerger, IntegrityChecker, IntegrityReport};
4use crate::readers::ConcurrentReader;
5use crate::utils::progress::ProgressReporter;
6use rayon::prelude::*;
7use std::path::Path;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10
11pub struct ParallelProcessor {
12 max_workers: usize,
13 chunk_size: usize,
14 allow_incomplete: bool,
15 strict_validation: bool,
16}
17
18impl ParallelProcessor {
19 pub fn new(max_workers: usize) -> Self {
20 Self {
21 max_workers,
22 chunk_size: 1000,
23 allow_incomplete: false,
24 strict_validation: false,
25 }
26 }
27
28 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
29 self.chunk_size = chunk_size;
30 self
31 }
32
33 pub fn with_allow_incomplete(mut self, allow_incomplete: bool) -> Self {
34 self.allow_incomplete = allow_incomplete;
35 self
36 }
37
38 pub fn with_strict_validation(mut self, strict_validation: bool) -> Self {
39 self.strict_validation = strict_validation;
40 self
41 }
42
43 pub async fn process_all_data(
45 &self,
46 base_path: &Path,
47 progress: Option<&ProgressReporter>,
48 ) -> Result<(Vec<ConsolidatedRecord>, IntegrityReport)> {
49 if let Some(p) = progress {
50 p.set_message("Reading temperature data...");
51 }
52
53 let reader = ConcurrentReader::new(self.max_workers);
55 let temperature_data = reader.read_all_temperature_data(base_path).await?;
56
57 if let Some(p) = progress {
58 p.set_message("Merging temperature data...");
59 }
60
61 let merger = DataMerger::with_allow_incomplete(self.allow_incomplete);
63 let consolidated_records = merger.merge_temperature_data(&temperature_data)?;
64
65 if let Some(p) = progress {
66 p.set_message("Checking data integrity...");
67 }
68
69 let checker = IntegrityChecker::with_strict_mode(self.strict_validation);
71 let integrity_report = checker.check_integrity(&consolidated_records)?;
72
73 if let Some(p) = progress {
74 p.finish_with_message("Processing complete");
75 }
76
77 Ok((consolidated_records, integrity_report))
78 }
79
80 pub fn process_by_stations(
82 &self,
83 stations: Vec<StationMetadata>,
84 base_path: &Path,
85 progress: Option<&ProgressReporter>,
86 ) -> Result<(Vec<ConsolidatedRecord>, IntegrityReport)> {
87 let total_stations = stations.len();
88 let processed_count = Arc::new(AtomicUsize::new(0));
89
90 if let Some(p) = progress {
91 p.set_message(&format!("Processing {} stations...", total_stations));
92 }
93
94 let pool = rayon::ThreadPoolBuilder::new()
96 .num_threads(self.max_workers)
97 .build()
98 .map_err(|e| crate::error::ProcessingError::Config(e.to_string()))?;
99
100 let all_records: Result<Vec<Vec<ConsolidatedRecord>>> = pool.install(|| {
102 stations
103 .par_iter()
104 .map(|station| {
105 let result = self.process_single_station(station, base_path);
106
107 let count = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
109 if let Some(p) = progress {
110 p.update(count as u64);
111 }
112
113 result
114 })
115 .collect()
116 });
117
118 let all_records = all_records?;
119
120 let mut consolidated_records: Vec<ConsolidatedRecord> =
122 all_records.into_iter().flatten().collect();
123
124 consolidated_records.sort_by(|a, b| {
126 a.station_id
127 .cmp(&b.station_id)
128 .then_with(|| a.date.cmp(&b.date))
129 });
130
131 if let Some(p) = progress {
132 p.set_message("Checking data integrity...");
133 }
134
135 let checker = IntegrityChecker::with_strict_mode(self.strict_validation);
137 let integrity_report = checker.check_integrity(&consolidated_records)?;
138
139 if let Some(p) = progress {
140 p.finish_with_message(&format!("Processed {} stations", total_stations));
141 }
142
143 Ok((consolidated_records, integrity_report))
144 }
145
146 fn process_single_station(
148 &self,
149 station: &StationMetadata,
150 base_path: &Path,
151 ) -> Result<Vec<ConsolidatedRecord>> {
152 let reader = ConcurrentReader::new(1);
153 let station_data = reader.process_station_data(station.staid, base_path)?;
154
155 let merger = DataMerger::with_allow_incomplete(self.allow_incomplete);
156 merger.merge_station_data(
157 station,
158 station_data.min_temperatures,
159 station_data.max_temperatures,
160 station_data.avg_temperatures,
161 )
162 }
163
164 pub fn process_in_batches<F>(
166 &self,
167 records: Vec<ConsolidatedRecord>,
168 batch_processor: F,
169 progress: Option<&ProgressReporter>,
170 ) -> Result<()>
171 where
172 F: Fn(&[ConsolidatedRecord]) -> Result<()> + Sync + Send,
173 {
174 let total_batches = records.len().div_ceil(self.chunk_size);
175 let processed_batches = Arc::new(AtomicUsize::new(0));
176
177 if let Some(p) = progress {
178 p.set_message(&format!("Processing {} batches...", total_batches));
179 }
180
181 records.par_chunks(self.chunk_size).try_for_each(|batch| {
183 let result = batch_processor(batch);
184
185 let count = processed_batches.fetch_add(1, Ordering::Relaxed) + 1;
187 if let Some(p) = progress {
188 p.update(count as u64);
189 }
190
191 result
192 })?;
193
194 if let Some(p) = progress {
195 p.finish_with_message("Batch processing complete");
196 }
197
198 Ok(())
199 }
200}
201
202impl Default for ParallelProcessor {
203 fn default() -> Self {
204 Self::new(num_cpus::get())
205 }
206}