1use crate::error::Result;
2use std::sync::mpsc;
3use std::sync::{Arc, Mutex};
4use std::thread;
5
6#[derive(Debug, Clone)]
8pub struct ParallelConfig {
9 pub num_threads: usize,
10 pub chunk_size: usize,
11 pub enable_parallel: bool,
12}
13
14impl Default for ParallelConfig {
15 fn default() -> Self {
16 let num_cpus = thread::available_parallelism()
17 .map(|n| n.get())
18 .unwrap_or(1);
19
20 Self {
21 num_threads: num_cpus,
22 chunk_size: 1000,
23 enable_parallel: num_cpus > 1,
24 }
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct ParallelResult<T> {
31 pub results: Vec<T>,
32 pub execution_time_ms: u64,
33 pub threads_used: usize,
34 pub chunks_processed: usize,
35}
36
37pub fn parallel_map<T, U, F>(
39 data: &[T],
40 config: &ParallelConfig,
41 func: F,
42) -> Result<ParallelResult<U>>
43where
44 T: Clone + Send + Sync + 'static,
45 U: Send + Sync + 'static,
46 F: Fn(&T) -> U + Send + Sync + Copy + 'static,
47{
48 let start_time = std::time::Instant::now();
49
50 if !config.enable_parallel || data.len() < config.chunk_size {
51 let results: Vec<U> = data.iter().map(func).collect();
53
54 return Ok(ParallelResult {
55 results,
56 execution_time_ms: start_time.elapsed().as_millis() as u64,
57 threads_used: 1,
58 chunks_processed: 1,
59 });
60 }
61
62 let owned_data: Vec<T> = data.to_vec();
64 let chunks: Vec<Vec<T>> = owned_data
65 .chunks(config.chunk_size)
66 .map(|chunk| chunk.to_vec())
67 .collect();
68
69 let num_chunks = chunks.len();
70 let num_threads = config.num_threads.min(num_chunks);
71
72 let (tx, rx) = mpsc::channel();
73 let chunk_queue = Arc::new(Mutex::new(chunks.into_iter().enumerate()));
74
75 let mut handles = Vec::new();
77 for _ in 0..num_threads {
78 let tx = tx.clone();
79 let queue = Arc::clone(&chunk_queue);
80
81 let handle = thread::spawn(move || {
82 while let Some((chunk_index, chunk)) = {
83 let mut queue = queue.lock().unwrap();
84 queue.next()
85 } {
86 let chunk_results: Vec<U> = chunk.iter().map(func).collect();
87 if tx.send((chunk_index, chunk_results)).is_err() {
88 break;
89 }
90 }
91 });
92
93 handles.push(handle);
94 }
95
96 drop(tx);
98
99 let mut chunk_results: Vec<(usize, Vec<U>)> = Vec::new();
101 while let Ok((chunk_index, results)) = rx.recv() {
102 chunk_results.push((chunk_index, results));
103 }
104
105 for handle in handles {
107 handle
108 .join()
109 .map_err(|_| crate::error::BenfError::ParseError("Thread join failed".to_string()))?;
110 }
111
112 chunk_results.sort_by_key(|(index, _)| *index);
114 let results: Vec<U> = chunk_results
115 .into_iter()
116 .flat_map(|(_, results)| results)
117 .collect();
118
119 Ok(ParallelResult {
120 results,
121 execution_time_ms: start_time.elapsed().as_millis() as u64,
122 threads_used: num_threads,
123 chunks_processed: num_chunks,
124 })
125}
126
127pub fn parallel_reduce<T, U, F, R>(
129 data: &[T],
130 config: &ParallelConfig,
131 map_func: F,
132 reduce_func: R,
133 initial: U,
134) -> Result<ParallelResult<U>>
135where
136 T: Clone + Send + Sync + 'static,
137 U: Clone + Send + Sync + 'static,
138 F: Fn(&T) -> U + Send + Sync + Copy + 'static,
139 R: Fn(U, U) -> U + Send + Sync + 'static,
140{
141 let start_time = std::time::Instant::now();
142
143 if !config.enable_parallel || data.len() < config.chunk_size {
144 let result = data.iter().map(map_func).fold(initial, &reduce_func);
146
147 return Ok(ParallelResult {
148 results: vec![result],
149 execution_time_ms: start_time.elapsed().as_millis() as u64,
150 threads_used: 1,
151 chunks_processed: 1,
152 });
153 }
154
155 let map_result = parallel_map(data, config, map_func)?;
157
158 let final_result = map_result.results.into_iter().fold(initial, reduce_func);
160
161 Ok(ParallelResult {
162 results: vec![final_result],
163 execution_time_ms: start_time.elapsed().as_millis() as u64,
164 threads_used: map_result.threads_used,
165 chunks_processed: map_result.chunks_processed,
166 })
167}
168
169pub fn parallel_statistics(
171 data: &[f64],
172 config: &ParallelConfig,
173) -> Result<ParallelResult<StatisticsChunk>> {
174 let start_time = std::time::Instant::now();
175
176 if !config.enable_parallel || data.len() < config.chunk_size {
177 let stats = calculate_chunk_statistics(data);
178 return Ok(ParallelResult {
179 results: vec![stats],
180 execution_time_ms: start_time.elapsed().as_millis() as u64,
181 threads_used: 1,
182 chunks_processed: 1,
183 });
184 }
185
186 let owned_data: Vec<f64> = data.to_vec();
188 let chunks: Vec<Vec<f64>> = owned_data
189 .chunks(config.chunk_size)
190 .map(|chunk| chunk.to_vec())
191 .collect();
192
193 let chunk_stats = parallel_map(&chunks, config, |chunk| calculate_chunk_statistics(chunk))?;
195
196 Ok(ParallelResult {
197 results: chunk_stats.results,
198 execution_time_ms: start_time.elapsed().as_millis() as u64,
199 threads_used: chunk_stats.threads_used,
200 chunks_processed: chunk_stats.chunks_processed,
201 })
202}
203
204#[derive(Debug, Clone)]
206pub struct StatisticsChunk {
207 pub count: usize,
208 pub sum: f64,
209 pub sum_squares: f64,
210 pub min: f64,
211 pub max: f64,
212 pub first_digit_counts: [usize; 9], }
214
215fn calculate_chunk_statistics(data: &[f64]) -> StatisticsChunk {
217 if data.is_empty() {
218 return StatisticsChunk {
219 count: 0,
220 sum: 0.0,
221 sum_squares: 0.0,
222 min: f64::INFINITY,
223 max: f64::NEG_INFINITY,
224 first_digit_counts: [0; 9],
225 };
226 }
227
228 let mut sum = 0.0;
229 let mut sum_squares = 0.0;
230 let mut min_val = f64::INFINITY;
231 let mut max_val = f64::NEG_INFINITY;
232 let mut first_digit_counts = [0; 9];
233
234 for &value in data {
235 sum += value;
236 sum_squares += value * value;
237 min_val = min_val.min(value);
238 max_val = max_val.max(value);
239
240 let abs_value = value.abs();
242 if abs_value >= 1.0 {
243 let first_digit = get_first_digit(abs_value);
244 if (1..=9).contains(&first_digit) {
245 first_digit_counts[first_digit - 1] += 1;
246 }
247 }
248 }
249
250 StatisticsChunk {
251 count: data.len(),
252 sum,
253 sum_squares,
254 min: min_val,
255 max: max_val,
256 first_digit_counts,
257 }
258}
259
260fn get_first_digit(value: f64) -> usize {
262 let mut n = value;
263 while n >= 10.0 {
264 n /= 10.0;
265 }
266 n as usize
267}
268
269pub fn combine_statistics_chunks(chunks: &[StatisticsChunk]) -> StatisticsChunk {
271 if chunks.is_empty() {
272 return StatisticsChunk {
273 count: 0,
274 sum: 0.0,
275 sum_squares: 0.0,
276 min: f64::INFINITY,
277 max: f64::NEG_INFINITY,
278 first_digit_counts: [0; 9],
279 };
280 }
281
282 let mut combined = StatisticsChunk {
283 count: 0,
284 sum: 0.0,
285 sum_squares: 0.0,
286 min: f64::INFINITY,
287 max: f64::NEG_INFINITY,
288 first_digit_counts: [0; 9],
289 };
290
291 for chunk in chunks {
292 combined.count += chunk.count;
293 combined.sum += chunk.sum;
294 combined.sum_squares += chunk.sum_squares;
295 combined.min = combined.min.min(chunk.min);
296 combined.max = combined.max.max(chunk.max);
297
298 for i in 0..9 {
299 combined.first_digit_counts[i] += chunk.first_digit_counts[i];
300 }
301 }
302
303 combined
304}
305
306pub fn parallel_benford_analysis(
308 data: &[f64],
309 config: &ParallelConfig,
310) -> Result<ParallelResult<BenfordChunkResult>> {
311 let start_time = std::time::Instant::now();
312
313 if !config.enable_parallel || data.len() < config.chunk_size {
314 let result = analyze_benford_chunk(data);
315 return Ok(ParallelResult {
316 results: vec![result],
317 execution_time_ms: start_time.elapsed().as_millis() as u64,
318 threads_used: 1,
319 chunks_processed: 1,
320 });
321 }
322
323 let owned_data: Vec<f64> = data.to_vec();
325 let chunks: Vec<Vec<f64>> = owned_data
326 .chunks(config.chunk_size)
327 .map(|chunk| chunk.to_vec())
328 .collect();
329
330 let chunk_results = parallel_map(&chunks, config, |chunk| analyze_benford_chunk(chunk))?;
331
332 Ok(ParallelResult {
333 results: chunk_results.results,
334 execution_time_ms: start_time.elapsed().as_millis() as u64,
335 threads_used: chunk_results.threads_used,
336 chunks_processed: chunk_results.chunks_processed,
337 })
338}
339
340#[derive(Debug, Clone)]
342pub struct BenfordChunkResult {
343 pub first_digit_counts: [usize; 9],
344 pub total_count: usize,
345 pub chunk_mad: f64,
346}
347
348fn analyze_benford_chunk(data: &[f64]) -> BenfordChunkResult {
350 let stats = calculate_chunk_statistics(data);
351
352 let expected_proportions = [
354 30.103, 17.609, 12.494, 9.691, 7.918, 6.695, 5.799, 5.115, 4.576,
355 ];
356
357 let mut mad = 0.0;
359 let total_valid = stats.first_digit_counts.iter().sum::<usize>();
360
361 if total_valid > 0 {
362 for (i, &expected) in expected_proportions.iter().enumerate() {
363 let observed_prop = (stats.first_digit_counts[i] as f64 / total_valid as f64) * 100.0;
364 mad += (observed_prop - expected).abs();
365 }
366 mad /= 9.0;
367 }
368
369 BenfordChunkResult {
370 first_digit_counts: stats.first_digit_counts,
371 total_count: total_valid,
372 chunk_mad: mad,
373 }
374}
375
376pub fn parallel_outlier_detection(
378 data: &[f64],
379 config: &ParallelConfig,
380 z_threshold: f64,
381) -> Result<ParallelResult<Vec<(usize, f64, f64)>>> {
382 let start_time = std::time::Instant::now();
383
384 let overall_stats = parallel_statistics(data, config)?;
386 let combined_stats = combine_statistics_chunks(&overall_stats.results);
387
388 let mean = combined_stats.sum / combined_stats.count as f64;
389 let variance = (combined_stats.sum_squares / combined_stats.count as f64) - (mean * mean);
390 let std_dev = variance.sqrt();
391
392 if !config.enable_parallel || data.len() < config.chunk_size {
393 let outliers = detect_outliers_chunk(data, 0, mean, std_dev, z_threshold);
394 return Ok(ParallelResult {
395 results: vec![outliers],
396 execution_time_ms: start_time.elapsed().as_millis() as u64,
397 threads_used: 1,
398 chunks_processed: 1,
399 });
400 }
401
402 let owned_data: Vec<f64> = data.to_vec();
404 let chunks_with_offset: Vec<(usize, Vec<f64>)> = owned_data
405 .chunks(config.chunk_size)
406 .enumerate()
407 .map(|(chunk_idx, chunk)| (chunk_idx * config.chunk_size, chunk.to_vec()))
408 .collect();
409
410 let outlier_results = parallel_map(&chunks_with_offset, config, move |(offset, chunk)| {
411 detect_outliers_chunk(chunk, *offset, mean, std_dev, z_threshold)
412 })?;
413
414 Ok(ParallelResult {
415 results: outlier_results.results,
416 execution_time_ms: start_time.elapsed().as_millis() as u64,
417 threads_used: outlier_results.threads_used,
418 chunks_processed: outlier_results.chunks_processed,
419 })
420}
421
422fn detect_outliers_chunk(
424 data: &[f64],
425 offset: usize,
426 mean: f64,
427 std_dev: f64,
428 z_threshold: f64,
429) -> Vec<(usize, f64, f64)> {
430 if std_dev == 0.0 {
431 return Vec::new();
432 }
433
434 data.iter()
435 .enumerate()
436 .filter_map(|(i, &value)| {
437 let z_score = (value - mean) / std_dev;
438 if z_score.abs() > z_threshold {
439 Some((offset + i, value, z_score))
440 } else {
441 None
442 }
443 })
444 .collect()
445}
446
447#[derive(Debug, Clone)]
449pub struct PerformanceMetrics {
450 pub serial_time_ms: u64,
451 pub parallel_time_ms: u64,
452 pub speedup: f64,
453 pub efficiency: f64,
454 pub threads_used: usize,
455}
456
457pub fn benchmark_parallel_performance<T, U, F>(
459 data: &[T],
460 config: &ParallelConfig,
461 func: F,
462) -> Result<PerformanceMetrics>
463where
464 T: Clone + Send + Sync + 'static,
465 U: Send + Sync + 'static,
466 F: Fn(&T) -> U + Send + Sync + Copy + 'static,
467{
468 let serial_start = std::time::Instant::now();
470 let _serial_result: Vec<U> = data.iter().map(func).collect();
471 let serial_time = serial_start.elapsed().as_millis() as u64;
472
473 let parallel_result = parallel_map(data, config, func)?;
475 let parallel_time = parallel_result.execution_time_ms;
476
477 let speedup = serial_time as f64 / parallel_time as f64;
478 let efficiency = speedup / parallel_result.threads_used as f64;
479
480 Ok(PerformanceMetrics {
481 serial_time_ms: serial_time,
482 parallel_time_ms: parallel_time,
483 speedup,
484 efficiency,
485 threads_used: parallel_result.threads_used,
486 })
487}