ecad_processor/processors/
parallel_processor.rs

1use crate::error::Result;
2use crate::models::{ConsolidatedRecord, StationMetadata};
3use crate::processors::{DataMerger, IntegrityChecker, IntegrityReport};
4use crate::readers::ConcurrentReader;
5use crate::utils::progress::ProgressReporter;
6use rayon::prelude::*;
7use std::path::Path;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10
11pub struct ParallelProcessor {
12    max_workers: usize,
13    chunk_size: usize,
14    allow_incomplete: bool,
15    strict_validation: bool,
16}
17
18impl ParallelProcessor {
19    pub fn new(max_workers: usize) -> Self {
20        Self {
21            max_workers,
22            chunk_size: 1000,
23            allow_incomplete: false,
24            strict_validation: false,
25        }
26    }
27
28    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
29        self.chunk_size = chunk_size;
30        self
31    }
32
33    pub fn with_allow_incomplete(mut self, allow_incomplete: bool) -> Self {
34        self.allow_incomplete = allow_incomplete;
35        self
36    }
37
38    pub fn with_strict_validation(mut self, strict_validation: bool) -> Self {
39        self.strict_validation = strict_validation;
40        self
41    }
42
43    /// Process all temperature data using parallel processing
44    pub async fn process_all_data(
45        &self,
46        base_path: &Path,
47        progress: Option<&ProgressReporter>,
48    ) -> Result<(Vec<ConsolidatedRecord>, IntegrityReport)> {
49        if let Some(p) = progress {
50            p.set_message("Reading temperature data...");
51        }
52
53        // Read all temperature data concurrently
54        let reader = ConcurrentReader::new(self.max_workers);
55        let temperature_data = reader.read_all_temperature_data(base_path).await?;
56
57        if let Some(p) = progress {
58            p.set_message("Merging temperature data...");
59        }
60
61        // Merge data into consolidated records
62        let merger = DataMerger::with_allow_incomplete(self.allow_incomplete);
63        let consolidated_records = merger.merge_temperature_data(&temperature_data)?;
64
65        if let Some(p) = progress {
66            p.set_message("Checking data integrity...");
67        }
68
69        // Check integrity
70        let checker = IntegrityChecker::with_strict_mode(self.strict_validation);
71        let integrity_report = checker.check_integrity(&consolidated_records)?;
72
73        if let Some(p) = progress {
74            p.finish_with_message("Processing complete");
75        }
76
77        Ok((consolidated_records, integrity_report))
78    }
79
80    /// Process temperature data by station in parallel
81    pub fn process_by_stations(
82        &self,
83        stations: Vec<StationMetadata>,
84        base_path: &Path,
85        progress: Option<&ProgressReporter>,
86    ) -> Result<(Vec<ConsolidatedRecord>, IntegrityReport)> {
87        let total_stations = stations.len();
88        let processed_count = Arc::new(AtomicUsize::new(0));
89
90        if let Some(p) = progress {
91            p.set_message(&format!("Processing {} stations...", total_stations));
92        }
93
94        // Configure Rayon thread pool
95        let pool = rayon::ThreadPoolBuilder::new()
96            .num_threads(self.max_workers)
97            .build()
98            .map_err(|e| crate::error::ProcessingError::Config(e.to_string()))?;
99
100        // Process stations in parallel
101        let all_records: Result<Vec<Vec<ConsolidatedRecord>>> = pool.install(|| {
102            stations
103                .par_iter()
104                .map(|station| {
105                    let result = self.process_single_station(station, base_path);
106
107                    // Update progress
108                    let count = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
109                    if let Some(p) = progress {
110                        p.update(count as u64);
111                    }
112
113                    result
114                })
115                .collect()
116        });
117
118        let all_records = all_records?;
119
120        // Flatten results
121        let mut consolidated_records: Vec<ConsolidatedRecord> =
122            all_records.into_iter().flatten().collect();
123
124        // Sort by station ID and date
125        consolidated_records.sort_by(|a, b| {
126            a.station_id
127                .cmp(&b.station_id)
128                .then_with(|| a.date.cmp(&b.date))
129        });
130
131        if let Some(p) = progress {
132            p.set_message("Checking data integrity...");
133        }
134
135        // Check integrity
136        let checker = IntegrityChecker::with_strict_mode(self.strict_validation);
137        let integrity_report = checker.check_integrity(&consolidated_records)?;
138
139        if let Some(p) = progress {
140            p.finish_with_message(&format!("Processed {} stations", total_stations));
141        }
142
143        Ok((consolidated_records, integrity_report))
144    }
145
146    /// Process a single station's data
147    fn process_single_station(
148        &self,
149        station: &StationMetadata,
150        base_path: &Path,
151    ) -> Result<Vec<ConsolidatedRecord>> {
152        let reader = ConcurrentReader::new(1);
153        let station_data = reader.process_station_data(station.staid, base_path)?;
154
155        let merger = DataMerger::with_allow_incomplete(self.allow_incomplete);
156        merger.merge_station_data(
157            station,
158            station_data.min_temperatures,
159            station_data.max_temperatures,
160            station_data.avg_temperatures,
161        )
162    }
163
164    /// Process records in batches for memory efficiency
165    pub fn process_in_batches<F>(
166        &self,
167        records: Vec<ConsolidatedRecord>,
168        batch_processor: F,
169        progress: Option<&ProgressReporter>,
170    ) -> Result<()>
171    where
172        F: Fn(&[ConsolidatedRecord]) -> Result<()> + Sync + Send,
173    {
174        let total_batches = records.len().div_ceil(self.chunk_size);
175        let processed_batches = Arc::new(AtomicUsize::new(0));
176
177        if let Some(p) = progress {
178            p.set_message(&format!("Processing {} batches...", total_batches));
179        }
180
181        // Process batches in parallel
182        records.par_chunks(self.chunk_size).try_for_each(|batch| {
183            let result = batch_processor(batch);
184
185            // Update progress
186            let count = processed_batches.fetch_add(1, Ordering::Relaxed) + 1;
187            if let Some(p) = progress {
188                p.update(count as u64);
189            }
190
191            result
192        })?;
193
194        if let Some(p) = progress {
195            p.finish_with_message("Batch processing complete");
196        }
197
198        Ok(())
199    }
200}
201
202impl Default for ParallelProcessor {
203    fn default() -> Self {
204        Self::new(num_cpus::get())
205    }
206}