graphrag_core/parallel/
mod.rs1use crate::core::Result;
7
8#[derive(Debug, Clone)]
10pub struct ParallelProcessor {
11 num_threads: usize,
12}
13
14#[derive(Debug, Clone, Default)]
16pub struct ParallelStatistics {
17 pub tasks_processed: usize,
19 pub total_time_ms: u64,
21 pub avg_time_per_task_ms: f64,
23}
24
25#[derive(Debug, Clone, Default)]
27pub struct PerformanceMonitor {
28 stats: ParallelStatistics,
29}
30
31impl PerformanceMonitor {
32 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn time_operation<F, T>(&mut self, operation: F) -> Result<T>
39 where
40 F: FnOnce() -> Result<T>,
41 {
42 let start = std::time::Instant::now();
43 let result = operation()?;
44 let elapsed = start.elapsed();
45
46 self.stats.tasks_processed += 1;
47 self.stats.total_time_ms += elapsed.as_millis() as u64;
48 self.stats.avg_time_per_task_ms =
49 self.stats.total_time_ms as f64 / self.stats.tasks_processed as f64;
50
51 Ok(result)
52 }
53
54 pub fn stats(&self) -> &ParallelStatistics {
56 &self.stats
57 }
58
59 pub fn get_stats(&self) -> &ParallelStatistics {
61 &self.stats
62 }
63
64 pub fn average_duration(&self) -> f64 {
66 self.stats.avg_time_per_task_ms
67 }
68
69 pub fn reset(&mut self) {
71 self.stats = ParallelStatistics::default();
72 }
73}
74
75impl Default for ParallelProcessor {
76 fn default() -> Self {
77 Self {
78 num_threads: num_cpus::get(),
79 }
80 }
81}
82
83impl ParallelProcessor {
84 pub fn new(num_threads: usize) -> Self {
86 Self { num_threads }
87 }
88
89 pub fn num_threads(&self) -> usize {
91 self.num_threads
92 }
93
94 pub fn config(&self) -> ParallelConfig {
96 ParallelConfig {
97 num_threads: self.num_threads,
98 batch_size: 100,
99 chunk_batch_size: 50,
100 }
101 }
102
103 pub fn execute_parallel<T, F>(&self, items: Vec<T>, f: F) -> Vec<T>
105 where
106 T: Send + Sync,
107 F: Fn(&T) -> T + Send + Sync,
108 {
109 #[cfg(feature = "parallel-processing")]
110 {
111 use rayon::prelude::*;
112 items.par_iter().map(f).collect()
113 }
114 #[cfg(not(feature = "parallel-processing"))]
115 {
116 items.iter().map(f).collect()
117 }
118 }
119
120 pub fn get_statistics(&self) -> ParallelStatistics {
122 ParallelStatistics::default()
123 }
124
125 pub fn should_use_parallel(&self, item_count: usize) -> bool {
127 item_count > 10 && self.num_threads > 1
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct ParallelConfig {
134 pub num_threads: usize,
136 pub batch_size: usize,
138 pub chunk_batch_size: usize,
140}
141
142#[cfg(feature = "parallel-processing")]
144pub fn configure_thread_pool(num_threads: usize) -> Result<()> {
145 rayon::ThreadPoolBuilder::new()
146 .num_threads(num_threads)
147 .build_global()
148 .map_err(|e| crate::core::GraphRAGError::Config {
149 message: format!("Failed to configure thread pool: {}", e)
150 })
151}
152
153#[cfg(not(feature = "parallel-processing"))]
155pub fn configure_thread_pool(_num_threads: usize) -> Result<()> {
156 Ok(())
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[test]
164 fn test_parallel_processor() {
165 let processor = ParallelProcessor::new(4);
166 assert_eq!(processor.num_threads(), 4);
167
168 let items = vec![1, 2, 3, 4, 5];
169 let results = processor.execute_parallel(items, |x| x * 2);
170 assert_eq!(results, vec![2, 4, 6, 8, 10]);
171 }
172
173 #[test]
174 fn test_processor_config() {
175 let processor = ParallelProcessor::default();
176 let items = vec![1, 2, 3];
177 let results = processor.execute_parallel(items, |x| x * 2);
178 assert_eq!(results, vec![2, 4, 6]);
179 }
180}