lawkit_core/common/
parallel.rs

1use crate::error::Result;
2use std::sync::mpsc;
3use std::sync::{Arc, Mutex};
4use std::thread;
5
6/// 並列処理設定
7#[derive(Debug, Clone)]
8pub struct ParallelConfig {
9    pub num_threads: usize,
10    pub chunk_size: usize,
11    pub enable_parallel: bool,
12}
13
14impl Default for ParallelConfig {
15    fn default() -> Self {
16        let num_cpus = thread::available_parallelism()
17            .map(|n| n.get())
18            .unwrap_or(1);
19
20        Self {
21            num_threads: num_cpus,
22            chunk_size: 1000,
23            enable_parallel: num_cpus > 1,
24        }
25    }
26}
27
28/// 並列処理結果
29#[derive(Debug, Clone)]
30pub struct ParallelResult<T> {
31    pub results: Vec<T>,
32    pub execution_time_ms: u64,
33    pub threads_used: usize,
34    pub chunks_processed: usize,
35}
36
37/// 並列データ処理
38pub fn parallel_map<T, U, F>(
39    data: &[T],
40    config: &ParallelConfig,
41    func: F,
42) -> Result<ParallelResult<U>>
43where
44    T: Clone + Send + Sync + 'static,
45    U: Send + Sync + 'static,
46    F: Fn(&T) -> U + Send + Sync + Copy + 'static,
47{
48    let start_time = std::time::Instant::now();
49
50    if !config.enable_parallel || data.len() < config.chunk_size {
51        // シングルスレッド処理
52        let results: Vec<U> = data.iter().map(func).collect();
53
54        return Ok(ParallelResult {
55            results,
56            execution_time_ms: start_time.elapsed().as_millis() as u64,
57            threads_used: 1,
58            chunks_processed: 1,
59        });
60    }
61
62    // 並列処理 - データを所有版に変換
63    let owned_data: Vec<T> = data.to_vec();
64    let chunks: Vec<Vec<T>> = owned_data
65        .chunks(config.chunk_size)
66        .map(|chunk| chunk.to_vec())
67        .collect();
68
69    let num_chunks = chunks.len();
70    let num_threads = config.num_threads.min(num_chunks);
71
72    let (tx, rx) = mpsc::channel();
73    let chunk_queue = Arc::new(Mutex::new(chunks.into_iter().enumerate()));
74
75    // ワーカースレッドを起動
76    let mut handles = Vec::new();
77    for _ in 0..num_threads {
78        let tx = tx.clone();
79        let queue = Arc::clone(&chunk_queue);
80
81        let handle = thread::spawn(move || {
82            while let Some((chunk_index, chunk)) = {
83                let mut queue = queue.lock().unwrap();
84                queue.next()
85            } {
86                let chunk_results: Vec<U> = chunk.iter().map(func).collect();
87                if tx.send((chunk_index, chunk_results)).is_err() {
88                    break;
89                }
90            }
91        });
92
93        handles.push(handle);
94    }
95
96    // 送信側を閉じる
97    drop(tx);
98
99    // 結果を収集
100    let mut chunk_results: Vec<(usize, Vec<U>)> = Vec::new();
101    while let Ok((chunk_index, results)) = rx.recv() {
102        chunk_results.push((chunk_index, results));
103    }
104
105    // スレッドの完了を待つ
106    for handle in handles {
107        handle
108            .join()
109            .map_err(|_| crate::error::BenfError::ParseError("Thread join failed".to_string()))?;
110    }
111
112    // 結果をソートして結合
113    chunk_results.sort_by_key(|(index, _)| *index);
114    let results: Vec<U> = chunk_results
115        .into_iter()
116        .flat_map(|(_, results)| results)
117        .collect();
118
119    Ok(ParallelResult {
120        results,
121        execution_time_ms: start_time.elapsed().as_millis() as u64,
122        threads_used: num_threads,
123        chunks_processed: num_chunks,
124    })
125}
126
127/// 並列リダクション処理
128pub fn parallel_reduce<T, U, F, R>(
129    data: &[T],
130    config: &ParallelConfig,
131    map_func: F,
132    reduce_func: R,
133    initial: U,
134) -> Result<ParallelResult<U>>
135where
136    T: Clone + Send + Sync + 'static,
137    U: Clone + Send + Sync + 'static,
138    F: Fn(&T) -> U + Send + Sync + Copy + 'static,
139    R: Fn(U, U) -> U + Send + Sync + 'static,
140{
141    let start_time = std::time::Instant::now();
142
143    if !config.enable_parallel || data.len() < config.chunk_size {
144        // シングルスレッド処理
145        let result = data.iter().map(map_func).fold(initial, &reduce_func);
146
147        return Ok(ParallelResult {
148            results: vec![result],
149            execution_time_ms: start_time.elapsed().as_millis() as u64,
150            threads_used: 1,
151            chunks_processed: 1,
152        });
153    }
154
155    // 並列map処理
156    let map_result = parallel_map(data, config, map_func)?;
157
158    // 結果をリダクション
159    let final_result = map_result.results.into_iter().fold(initial, reduce_func);
160
161    Ok(ParallelResult {
162        results: vec![final_result],
163        execution_time_ms: start_time.elapsed().as_millis() as u64,
164        threads_used: map_result.threads_used,
165        chunks_processed: map_result.chunks_processed,
166    })
167}
168
169/// 並列統計計算
170pub fn parallel_statistics(
171    data: &[f64],
172    config: &ParallelConfig,
173) -> Result<ParallelResult<StatisticsChunk>> {
174    let start_time = std::time::Instant::now();
175
176    if !config.enable_parallel || data.len() < config.chunk_size {
177        let stats = calculate_chunk_statistics(data);
178        return Ok(ParallelResult {
179            results: vec![stats],
180            execution_time_ms: start_time.elapsed().as_millis() as u64,
181            threads_used: 1,
182            chunks_processed: 1,
183        });
184    }
185
186    // データを所有版に変換してチャンクごとに分割
187    let owned_data: Vec<f64> = data.to_vec();
188    let chunks: Vec<Vec<f64>> = owned_data
189        .chunks(config.chunk_size)
190        .map(|chunk| chunk.to_vec())
191        .collect();
192
193    // 各チャンクで統計を計算
194    let chunk_stats = parallel_map(&chunks, config, |chunk| calculate_chunk_statistics(chunk))?;
195
196    Ok(ParallelResult {
197        results: chunk_stats.results,
198        execution_time_ms: start_time.elapsed().as_millis() as u64,
199        threads_used: chunk_stats.threads_used,
200        chunks_processed: chunk_stats.chunks_processed,
201    })
202}
203
204/// チャンク統計
205#[derive(Debug, Clone)]
206pub struct StatisticsChunk {
207    pub count: usize,
208    pub sum: f64,
209    pub sum_squares: f64,
210    pub min: f64,
211    pub max: f64,
212    pub first_digit_counts: [usize; 9], // ベンフォード法則用
213}
214
215/// チャンク統計を計算
216fn calculate_chunk_statistics(data: &[f64]) -> StatisticsChunk {
217    if data.is_empty() {
218        return StatisticsChunk {
219            count: 0,
220            sum: 0.0,
221            sum_squares: 0.0,
222            min: f64::INFINITY,
223            max: f64::NEG_INFINITY,
224            first_digit_counts: [0; 9],
225        };
226    }
227
228    let mut sum = 0.0;
229    let mut sum_squares = 0.0;
230    let mut min_val = f64::INFINITY;
231    let mut max_val = f64::NEG_INFINITY;
232    let mut first_digit_counts = [0; 9];
233
234    for &value in data {
235        sum += value;
236        sum_squares += value * value;
237        min_val = min_val.min(value);
238        max_val = max_val.max(value);
239
240        // 第一桁をカウント(ベンフォード法則用)
241        let abs_value = value.abs();
242        if abs_value >= 1.0 {
243            let first_digit = get_first_digit(abs_value);
244            if (1..=9).contains(&first_digit) {
245                first_digit_counts[first_digit - 1] += 1;
246            }
247        }
248    }
249
250    StatisticsChunk {
251        count: data.len(),
252        sum,
253        sum_squares,
254        min: min_val,
255        max: max_val,
256        first_digit_counts,
257    }
258}
259
260/// 第一桁を取得
261fn get_first_digit(value: f64) -> usize {
262    let mut n = value;
263    while n >= 10.0 {
264        n /= 10.0;
265    }
266    n as usize
267}
268
269/// 複数のチャンク統計を結合
270pub fn combine_statistics_chunks(chunks: &[StatisticsChunk]) -> StatisticsChunk {
271    if chunks.is_empty() {
272        return StatisticsChunk {
273            count: 0,
274            sum: 0.0,
275            sum_squares: 0.0,
276            min: f64::INFINITY,
277            max: f64::NEG_INFINITY,
278            first_digit_counts: [0; 9],
279        };
280    }
281
282    let mut combined = StatisticsChunk {
283        count: 0,
284        sum: 0.0,
285        sum_squares: 0.0,
286        min: f64::INFINITY,
287        max: f64::NEG_INFINITY,
288        first_digit_counts: [0; 9],
289    };
290
291    for chunk in chunks {
292        combined.count += chunk.count;
293        combined.sum += chunk.sum;
294        combined.sum_squares += chunk.sum_squares;
295        combined.min = combined.min.min(chunk.min);
296        combined.max = combined.max.max(chunk.max);
297
298        for i in 0..9 {
299            combined.first_digit_counts[i] += chunk.first_digit_counts[i];
300        }
301    }
302
303    combined
304}
305
306/// 並列ベンフォード分析
307pub fn parallel_benford_analysis(
308    data: &[f64],
309    config: &ParallelConfig,
310) -> Result<ParallelResult<BenfordChunkResult>> {
311    let start_time = std::time::Instant::now();
312
313    if !config.enable_parallel || data.len() < config.chunk_size {
314        let result = analyze_benford_chunk(data);
315        return Ok(ParallelResult {
316            results: vec![result],
317            execution_time_ms: start_time.elapsed().as_millis() as u64,
318            threads_used: 1,
319            chunks_processed: 1,
320        });
321    }
322
323    // データを所有版に変換してチャンクごとに分割
324    let owned_data: Vec<f64> = data.to_vec();
325    let chunks: Vec<Vec<f64>> = owned_data
326        .chunks(config.chunk_size)
327        .map(|chunk| chunk.to_vec())
328        .collect();
329
330    let chunk_results = parallel_map(&chunks, config, |chunk| analyze_benford_chunk(chunk))?;
331
332    Ok(ParallelResult {
333        results: chunk_results.results,
334        execution_time_ms: start_time.elapsed().as_millis() as u64,
335        threads_used: chunk_results.threads_used,
336        chunks_processed: chunk_results.chunks_processed,
337    })
338}
339
340/// ベンフォードチャンク結果
341#[derive(Debug, Clone)]
342pub struct BenfordChunkResult {
343    pub first_digit_counts: [usize; 9],
344    pub total_count: usize,
345    pub chunk_mad: f64,
346}
347
348/// ベンフォードチャンク分析
349fn analyze_benford_chunk(data: &[f64]) -> BenfordChunkResult {
350    let stats = calculate_chunk_statistics(data);
351
352    // ベンフォード期待値
353    let expected_proportions = [
354        30.103, 17.609, 12.494, 9.691, 7.918, 6.695, 5.799, 5.115, 4.576,
355    ];
356
357    // MAD計算
358    let mut mad = 0.0;
359    let total_valid = stats.first_digit_counts.iter().sum::<usize>();
360
361    if total_valid > 0 {
362        for (i, &expected) in expected_proportions.iter().enumerate() {
363            let observed_prop = (stats.first_digit_counts[i] as f64 / total_valid as f64) * 100.0;
364            mad += (observed_prop - expected).abs();
365        }
366        mad /= 9.0;
367    }
368
369    BenfordChunkResult {
370        first_digit_counts: stats.first_digit_counts,
371        total_count: total_valid,
372        chunk_mad: mad,
373    }
374}
375
376/// 並列異常値検出
377pub fn parallel_outlier_detection(
378    data: &[f64],
379    config: &ParallelConfig,
380    z_threshold: f64,
381) -> Result<ParallelResult<Vec<(usize, f64, f64)>>> {
382    let start_time = std::time::Instant::now();
383
384    // まず全体の統計を計算
385    let overall_stats = parallel_statistics(data, config)?;
386    let combined_stats = combine_statistics_chunks(&overall_stats.results);
387
388    let mean = combined_stats.sum / combined_stats.count as f64;
389    let variance = (combined_stats.sum_squares / combined_stats.count as f64) - (mean * mean);
390    let std_dev = variance.sqrt();
391
392    if !config.enable_parallel || data.len() < config.chunk_size {
393        let outliers = detect_outliers_chunk(data, 0, mean, std_dev, z_threshold);
394        return Ok(ParallelResult {
395            results: vec![outliers],
396            execution_time_ms: start_time.elapsed().as_millis() as u64,
397            threads_used: 1,
398            chunks_processed: 1,
399        });
400    }
401
402    // データを所有版に変換してチャンクごとに分割
403    let owned_data: Vec<f64> = data.to_vec();
404    let chunks_with_offset: Vec<(usize, Vec<f64>)> = owned_data
405        .chunks(config.chunk_size)
406        .enumerate()
407        .map(|(chunk_idx, chunk)| (chunk_idx * config.chunk_size, chunk.to_vec()))
408        .collect();
409
410    let outlier_results = parallel_map(&chunks_with_offset, config, move |(offset, chunk)| {
411        detect_outliers_chunk(chunk, *offset, mean, std_dev, z_threshold)
412    })?;
413
414    Ok(ParallelResult {
415        results: outlier_results.results,
416        execution_time_ms: start_time.elapsed().as_millis() as u64,
417        threads_used: outlier_results.threads_used,
418        chunks_processed: outlier_results.chunks_processed,
419    })
420}
421
422/// チャンク内異常値検出
423fn detect_outliers_chunk(
424    data: &[f64],
425    offset: usize,
426    mean: f64,
427    std_dev: f64,
428    z_threshold: f64,
429) -> Vec<(usize, f64, f64)> {
430    if std_dev == 0.0 {
431        return Vec::new();
432    }
433
434    data.iter()
435        .enumerate()
436        .filter_map(|(i, &value)| {
437            let z_score = (value - mean) / std_dev;
438            if z_score.abs() > z_threshold {
439                Some((offset + i, value, z_score))
440            } else {
441                None
442            }
443        })
444        .collect()
445}
446
447/// 並列処理のパフォーマンス測定
448#[derive(Debug, Clone)]
449pub struct PerformanceMetrics {
450    pub serial_time_ms: u64,
451    pub parallel_time_ms: u64,
452    pub speedup: f64,
453    pub efficiency: f64,
454    pub threads_used: usize,
455}
456
457/// パフォーマンス比較
458pub fn benchmark_parallel_performance<T, U, F>(
459    data: &[T],
460    config: &ParallelConfig,
461    func: F,
462) -> Result<PerformanceMetrics>
463where
464    T: Clone + Send + Sync + 'static,
465    U: Send + Sync + 'static,
466    F: Fn(&T) -> U + Send + Sync + Copy + 'static,
467{
468    // シリアル実行
469    let serial_start = std::time::Instant::now();
470    let _serial_result: Vec<U> = data.iter().map(func).collect();
471    let serial_time = serial_start.elapsed().as_millis() as u64;
472
473    // 並列実行
474    let parallel_result = parallel_map(data, config, func)?;
475    let parallel_time = parallel_result.execution_time_ms;
476
477    let speedup = serial_time as f64 / parallel_time as f64;
478    let efficiency = speedup / parallel_result.threads_used as f64;
479
480    Ok(PerformanceMetrics {
481        serial_time_ms: serial_time,
482        parallel_time_ms: parallel_time,
483        speedup,
484        efficiency,
485        threads_used: parallel_result.threads_used,
486    })
487}