Skip to main content

graphrag_core/parallel/
mod.rs

1//! Parallel processing utilities for GraphRAG
2//!
3//! This module provides parallel processing capabilities using Rayon
4//! for improved performance on multi-core systems.
5
6use crate::core::Result;
7
8/// Parallel processor for batch operations
9#[derive(Debug, Clone)]
10pub struct ParallelProcessor {
11    num_threads: usize,
12}
13
14/// Statistics about parallel processing performance
15#[derive(Debug, Clone, Default)]
16pub struct ParallelStatistics {
17    /// Number of tasks processed
18    pub tasks_processed: usize,
19    /// Total processing time in milliseconds
20    pub total_time_ms: u64,
21    /// Average time per task
22    pub avg_time_per_task_ms: f64,
23}
24
25/// Performance monitor for parallel operations
26#[derive(Debug, Clone, Default)]
27pub struct PerformanceMonitor {
28    stats: ParallelStatistics,
29}
30
31impl PerformanceMonitor {
32    /// Create a new performance monitor
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Time an operation and record statistics
38    pub fn time_operation<F, T>(&mut self, operation: F) -> Result<T>
39    where
40        F: FnOnce() -> Result<T>,
41    {
42        let start = std::time::Instant::now();
43        let result = operation()?;
44        let elapsed = start.elapsed();
45
46        self.stats.tasks_processed += 1;
47        self.stats.total_time_ms += elapsed.as_millis() as u64;
48        self.stats.avg_time_per_task_ms =
49            self.stats.total_time_ms as f64 / self.stats.tasks_processed as f64;
50
51        Ok(result)
52    }
53
54    /// Get current statistics
55    pub fn stats(&self) -> &ParallelStatistics {
56        &self.stats
57    }
58
59    /// Get statistics (alternative method name)
60    pub fn get_stats(&self) -> &ParallelStatistics {
61        &self.stats
62    }
63
64    /// Get average operation duration
65    pub fn average_duration(&self) -> f64 {
66        self.stats.avg_time_per_task_ms
67    }
68
69    /// Reset statistics
70    pub fn reset(&mut self) {
71        self.stats = ParallelStatistics::default();
72    }
73}
74
75impl Default for ParallelProcessor {
76    fn default() -> Self {
77        Self {
78            num_threads: num_cpus::get(),
79        }
80    }
81}
82
83impl ParallelProcessor {
84    /// Create a new parallel processor with the specified number of threads
85    pub fn new(num_threads: usize) -> Self {
86        Self { num_threads }
87    }
88
89    /// Get the number of threads
90    pub fn num_threads(&self) -> usize {
91        self.num_threads
92    }
93
94    /// Get processor configuration
95    pub fn config(&self) -> ParallelConfig {
96        ParallelConfig {
97            num_threads: self.num_threads,
98            batch_size: 100,
99            chunk_batch_size: 50,
100        }
101    }
102
103    /// Execute work in parallel
104    pub fn execute_parallel<T, F>(&self, items: Vec<T>, f: F) -> Vec<T>
105    where
106        T: Send + Sync,
107        F: Fn(&T) -> T + Send + Sync,
108    {
109        #[cfg(feature = "parallel-processing")]
110        {
111            use rayon::prelude::*;
112            items.par_iter().map(f).collect()
113        }
114        #[cfg(not(feature = "parallel-processing"))]
115        {
116            items.iter().map(f).collect()
117        }
118    }
119
120    /// Get processing statistics
121    pub fn get_statistics(&self) -> ParallelStatistics {
122        ParallelStatistics::default()
123    }
124
125    /// Determine if parallel processing should be used
126    pub fn should_use_parallel(&self, item_count: usize) -> bool {
127        item_count > 10 && self.num_threads > 1
128    }
129}
130
131/// Configuration for parallel processing
132#[derive(Debug, Clone)]
133pub struct ParallelConfig {
134    /// Number of threads to use
135    pub num_threads: usize,
136    /// Batch size for processing
137    pub batch_size: usize,
138    /// Chunk batch size for parallel processing
139    pub chunk_batch_size: usize,
140}
141
142/// Configure Rayon thread pool
143#[cfg(feature = "parallel-processing")]
144pub fn configure_thread_pool(num_threads: usize) -> Result<()> {
145    rayon::ThreadPoolBuilder::new()
146        .num_threads(num_threads)
147        .build_global()
148        .map_err(|e| crate::core::GraphRAGError::Config {
149            message: format!("Failed to configure thread pool: {}", e)
150        })
151}
152
153/// No-op when parallel processing is disabled
154#[cfg(not(feature = "parallel-processing"))]
155pub fn configure_thread_pool(_num_threads: usize) -> Result<()> {
156    Ok(())
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    #[test]
164    fn test_parallel_processor() {
165        let processor = ParallelProcessor::new(4);
166        assert_eq!(processor.num_threads(), 4);
167
168        let items = vec![1, 2, 3, 4, 5];
169        let results = processor.execute_parallel(items, |x| x * 2);
170        assert_eq!(results, vec![2, 4, 6, 8, 10]);
171    }
172
173    #[test]
174    fn test_processor_config() {
175        let processor = ParallelProcessor::default();
176        let items = vec![1, 2, 3];
177        let results = processor.execute_parallel(items, |x| x * 2);
178        assert_eq!(results, vec![2, 4, 6]);
179    }
180}