use crate::core::Result;
#[derive(Debug, Clone)]
pub struct ParallelProcessor {
num_threads: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ParallelStatistics {
pub tasks_processed: usize,
pub total_time_ms: u64,
pub avg_time_per_task_ms: f64,
}
#[derive(Debug, Clone, Default)]
pub struct PerformanceMonitor {
stats: ParallelStatistics,
}
impl PerformanceMonitor {
pub fn new() -> Self {
Self::default()
}
pub fn time_operation<F, T>(&mut self, operation: F) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
let start = std::time::Instant::now();
let result = operation()?;
let elapsed = start.elapsed();
self.stats.tasks_processed += 1;
self.stats.total_time_ms += elapsed.as_millis() as u64;
self.stats.avg_time_per_task_ms =
self.stats.total_time_ms as f64 / self.stats.tasks_processed as f64;
Ok(result)
}
pub fn stats(&self) -> &ParallelStatistics {
&self.stats
}
pub fn get_stats(&self) -> &ParallelStatistics {
&self.stats
}
pub fn average_duration(&self) -> f64 {
self.stats.avg_time_per_task_ms
}
pub fn reset(&mut self) {
self.stats = ParallelStatistics::default();
}
}
impl Default for ParallelProcessor {
fn default() -> Self {
Self {
num_threads: num_cpus::get(),
}
}
}
impl ParallelProcessor {
pub fn new(num_threads: usize) -> Self {
Self { num_threads }
}
pub fn num_threads(&self) -> usize {
self.num_threads
}
pub fn config(&self) -> ParallelConfig {
ParallelConfig {
num_threads: self.num_threads,
batch_size: 100,
chunk_batch_size: 50,
}
}
pub fn execute_parallel<T, F>(&self, items: Vec<T>, f: F) -> Vec<T>
where
T: Send + Sync,
F: Fn(&T) -> T + Send + Sync,
{
#[cfg(feature = "parallel-processing")]
{
use rayon::prelude::*;
items.par_iter().map(f).collect()
}
#[cfg(not(feature = "parallel-processing"))]
{
items.iter().map(f).collect()
}
}
pub fn get_statistics(&self) -> ParallelStatistics {
ParallelStatistics::default()
}
pub fn should_use_parallel(&self, item_count: usize) -> bool {
item_count > 10 && self.num_threads > 1
}
}
#[derive(Debug, Clone)]
pub struct ParallelConfig {
pub num_threads: usize,
pub batch_size: usize,
pub chunk_batch_size: usize,
}
#[cfg(feature = "parallel-processing")]
pub fn configure_thread_pool(num_threads: usize) -> Result<()> {
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global()
.map_err(|e| crate::core::GraphRAGError::Config {
message: format!("Failed to configure thread pool: {}", e),
})
}
#[cfg(not(feature = "parallel-processing"))]
pub fn configure_thread_pool(_num_threads: usize) -> Result<()> {
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_processor() {
let processor = ParallelProcessor::new(4);
assert_eq!(processor.num_threads(), 4);
let items = vec![1, 2, 3, 4, 5];
let results = processor.execute_parallel(items, |x| x * 2);
assert_eq!(results, vec![2, 4, 6, 8, 10]);
}
#[test]
fn test_processor_config() {
let processor = ParallelProcessor::default();
let items = vec![1, 2, 3];
let results = processor.execute_parallel(items, |x| x * 2);
assert_eq!(results, vec![2, 4, 6]);
}
}