use numrs2::parallel_optimize::{
adaptive_threshold, optimize_parallel_computation, optimize_scheduling, partition_workload,
ParallelConfig, ParallelizationThreshold, SchedulingStrategy, WorkloadPartitioning,
};
use scirs2_core::parallel_ops::*;
use std::time::Instant;
fn main() {
println!("NumRS Parallel Processing Optimization Example");
println!("============================================\n");
println!("1. Parallelization Threshold Optimization");
println!("-----------------------------------------");
let size = 2_000_000;
let array_data: Vec<f64> = (0..size).map(|i| i as f64).collect();
println!("Testing different parallelization thresholds with element cost = 1.0:");
let fixed_high = 1_000_000;
let time_fixed_high =
benchmark_parallel_sum(&array_data, ParallelizationThreshold::Fixed(fixed_high));
println!(" Fixed threshold ({}): {:?}", fixed_high, time_fixed_high);
let fixed_low = 1_000;
let time_fixed_low =
benchmark_parallel_sum(&array_data, ParallelizationThreshold::Fixed(fixed_low));
println!(" Fixed threshold ({}): {:?}", fixed_low, time_fixed_low);
let adaptive = adaptive_threshold(size, 1.0);
let time_adaptive = benchmark_parallel_sum(&array_data, ParallelizationThreshold::Adaptive);
println!(" Adaptive threshold ({}): {:?}", adaptive, time_adaptive);
let best_time = time_fixed_low.min(time_adaptive).min(time_fixed_high);
println!(
" Best threshold provides a {:.2}x speedup over worst threshold",
time_fixed_high
.max(time_fixed_low)
.max(time_adaptive)
.as_secs_f64()
/ best_time.as_secs_f64()
);
println!("\n2. Scheduling Strategy Optimization");
println!("---------------------------------");
println!("Testing different scheduling strategies:");
let element_cost = 1.0;
let num_threads = scirs2_core::parallel_ops::num_threads();
let static_threads =
optimize_scheduling(size, element_cost, SchedulingStrategy::Static, num_threads);
let time_static = benchmark_scheduling(&array_data, SchedulingStrategy::Static);
println!(
" Static scheduling (threads={}): {:?}",
static_threads, time_static
);
let dynamic_threads =
optimize_scheduling(size, element_cost, SchedulingStrategy::Dynamic, num_threads);
let time_dynamic = benchmark_scheduling(&array_data, SchedulingStrategy::Dynamic);
println!(
" Dynamic scheduling (threads={}): {:?}",
dynamic_threads, time_dynamic
);
let guided_threads =
optimize_scheduling(size, element_cost, SchedulingStrategy::Guided, num_threads);
let time_guided = benchmark_scheduling(&array_data, SchedulingStrategy::Guided);
println!(
" Guided scheduling (threads={}): {:?}",
guided_threads, time_guided
);
let ws_threads = optimize_scheduling(
size,
element_cost,
SchedulingStrategy::WorkStealing,
num_threads,
);
let time_ws = benchmark_scheduling(&array_data, SchedulingStrategy::WorkStealing);
println!(
" Work-stealing scheduling (threads={}): {:?}",
ws_threads, time_ws
);
let adaptive_threads = optimize_scheduling(
size,
element_cost,
SchedulingStrategy::Adaptive,
num_threads,
);
let time_adaptive = benchmark_scheduling(&array_data, SchedulingStrategy::Adaptive);
println!(
" Adaptive scheduling (threads={}): {:?}",
adaptive_threads, time_adaptive
);
let best_sched_time = time_static
.min(time_dynamic)
.min(time_guided)
.min(time_ws)
.min(time_adaptive);
println!(
" Best scheduling strategy provides a {:.2}x speedup over worst strategy",
time_static
.max(time_dynamic)
.max(time_guided)
.max(time_ws)
.max(time_adaptive)
.as_secs_f64()
/ best_sched_time.as_secs_f64()
);
println!("\n3. Workload Partitioning Optimization");
println!("-----------------------------------");
println!("Testing different workload partitioning strategies:");
let time_equal = benchmark_partitioning(&array_data, WorkloadPartitioning::EqualChunks);
println!(" Equal chunks: {:?}", time_equal);
let time_variable = benchmark_partitioning(&array_data, WorkloadPartitioning::VariableChunks);
println!(" Variable chunks: {:?}", time_variable);
let time_pot = benchmark_partitioning(&array_data, WorkloadPartitioning::PowerOfTwoChunks);
println!(" Power-of-two chunks: {:?}", time_pot);
let time_cache =
benchmark_partitioning(&array_data, WorkloadPartitioning::CacheOptimizedChunks);
println!(" Cache-optimized chunks: {:?}", time_cache);
let time_dynamic =
benchmark_partitioning(&array_data, WorkloadPartitioning::DynamicPartitioning);
println!(" Dynamic partitioning: {:?}", time_dynamic);
let best_part_time = time_equal
.min(time_variable)
.min(time_pot)
.min(time_cache)
.min(time_dynamic);
println!(
" Best partitioning strategy provides a {:.2}x speedup over worst strategy",
time_equal
.max(time_variable)
.max(time_pot)
.max(time_cache)
.max(time_dynamic)
.as_secs_f64()
/ best_part_time.as_secs_f64()
);
println!("\n4. Combined Parallel Optimization");
println!("-------------------------------");
let size = 5_000_000;
let element_cost = 2.0;
let array_data: Vec<f64> = (0..size).map(|i| i as f64).collect();
println!("Testing different optimization configurations:");
let time_default = benchmark_with_config(&array_data, ParallelConfig::default());
println!(" Default configuration: {:?}", time_default);
let optimized_config = ParallelConfig::optimized(size, element_cost);
let time_optimized = benchmark_with_config(&array_data, optimized_config);
println!(" Optimized configuration: {:?}", time_optimized);
println!(
" Minimum parallel size: {}",
optimized_config.min_parallel_size
);
println!(" Chunk size: {}", optimized_config.chunk_size);
println!(
" Scheduling strategy: {:?}",
optimized_config.scheduling_strategy
);
let tuned_config = ParallelConfig::default()
.with_min_size(adaptive_threshold(size, element_cost))
.with_chunk_size(size / 64)
.with_scheduling(SchedulingStrategy::WorkStealing);
let time_tuned = benchmark_with_config(&array_data, tuned_config);
println!(" Fine-tuned configuration: {:?}", time_tuned);
let best_config_time = time_default.min(time_optimized).min(time_tuned);
println!(
" Best configuration provides a {:.2}x speedup over default",
time_default.as_secs_f64() / best_config_time.as_secs_f64()
);
println!("\n5. Adapting to Different Workloads");
println!("---------------------------------");
println!("Testing adaptation to different workloads:");
let small_size = 10_000;
let _small_data: Vec<f64> = (0..small_size).map(|i| i as f64).collect();
let optimal_threads_small =
optimize_parallel_computation(small_size, 0.1, SchedulingStrategy::Adaptive);
println!(" Small workload (size={}, cost=0.1):", small_size);
println!(" Optimal threads: {}", optimal_threads_small);
println!(
" Parallel execution recommended: {}",
optimal_threads_small > 1
);
let medium_size = 100_000;
let _medium_data: Vec<f64> = (0..medium_size).map(|i| i as f64).collect();
let optimal_threads_medium =
optimize_parallel_computation(medium_size, 1.0, SchedulingStrategy::Adaptive);
println!(" Medium workload (size={}, cost=1.0):", medium_size);
println!(" Optimal threads: {}", optimal_threads_medium);
println!(
" Parallel execution recommended: {}",
optimal_threads_medium > 1
);
let large_size = 1_000_000;
let _large_data: Vec<f64> = (0..large_size).map(|i| i as f64).collect();
let optimal_threads_large =
optimize_parallel_computation(large_size, 5.0, SchedulingStrategy::Adaptive);
println!(" Large workload (size={}, cost=5.0):", large_size);
println!(" Optimal threads: {}", optimal_threads_large);
println!(
" Parallel execution recommended: {}",
optimal_threads_large > 1
);
let very_large_size = 10_000_000;
let optimal_threads_very_large =
optimize_parallel_computation(very_large_size, 0.5, SchedulingStrategy::Adaptive);
println!(
" Very large workload (size={}, cost=0.5):",
very_large_size
);
println!(" Optimal threads: {}", optimal_threads_very_large);
println!(
" Parallel execution recommended: {}",
optimal_threads_very_large > 1
);
}
fn benchmark_parallel_sum(
data: &[f64],
threshold_type: ParallelizationThreshold,
) -> std::time::Duration {
let start = Instant::now();
let size = data.len();
let element_cost = 1.0;
let threshold = match threshold_type {
ParallelizationThreshold::Fixed(value) => value,
ParallelizationThreshold::Adaptive => adaptive_threshold(size, element_cost),
_ => adaptive_threshold(size, element_cost), };
let sum = if size <= threshold {
data.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
} else {
data.par_iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
};
let duration = start.elapsed();
if sum < 0.0 {
println!("Sum is negative (should never happen): {}", sum);
}
duration
}
fn benchmark_scheduling(data: &[f64], strategy: SchedulingStrategy) -> std::time::Duration {
let start = Instant::now();
let size = data.len();
let element_cost = 1.0;
let num_threads = scirs2_core::parallel_ops::num_threads();
let optimal_threads = optimize_scheduling(size, element_cost, strategy, num_threads);
let pool = scirs2_core::parallel_ops::ThreadPoolBuilder::new()
.num_threads(optimal_threads)
.build()
.unwrap();
let sum = pool.install(|| {
match strategy {
SchedulingStrategy::Static => {
let chunk_size = size.div_ceil(optimal_threads);
let chunks: Vec<_> = data.chunks(chunk_size).collect();
chunks
.par_iter()
.map(|chunk| {
chunk
.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
})
.sum::<f64>()
}
SchedulingStrategy::Dynamic => {
let chunk_size = 1000;
let chunks: Vec<_> = data.chunks(chunk_size).collect();
chunks
.par_iter()
.map(|chunk| {
chunk
.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
})
.sum::<f64>()
}
SchedulingStrategy::Guided => {
data.par_iter()
.with_min_len(100)
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
}
SchedulingStrategy::WorkStealing => {
data.par_iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
}
SchedulingStrategy::Adaptive => {
if size < 100_000 {
let chunk_size = size.div_ceil(optimal_threads);
let chunks: Vec<_> = data.chunks(chunk_size).collect();
chunks
.par_iter()
.map(|chunk| {
chunk
.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
})
.sum::<f64>()
} else {
data.par_iter()
.with_min_len(100)
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
}
}
}
});
let duration = start.elapsed();
if sum < 0.0 {
println!("Sum is negative (should never happen): {}", sum);
}
duration
}
fn benchmark_partitioning(data: &[f64], partitioning: WorkloadPartitioning) -> std::time::Duration {
let start = Instant::now();
let size = data.len();
let element_cost = 1.0;
let partitions = partition_workload(size, partitioning, 0);
let sum: f64 = partitions
.par_iter()
.map(|range| {
let mut sum = 0.0;
for i in range.clone() {
sum += compute_with_cost(data[i], element_cost);
}
sum
})
.sum::<f64>();
let duration = start.elapsed();
if sum < 0.0 {
println!("Sum is negative (should never happen): {}", sum);
}
duration
}
fn benchmark_with_config(data: &[f64], config: ParallelConfig) -> std::time::Duration {
let start = Instant::now();
let size = data.len();
let element_cost = 2.0;
let sum = if !config.should_parallelize(size) {
data.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
} else {
let optimal_threads = config.optimal_threads(size, element_cost);
let pool = scirs2_core::parallel_ops::ThreadPoolBuilder::new()
.num_threads(optimal_threads)
.build()
.unwrap();
pool.install(|| {
match config.scheduling_strategy {
SchedulingStrategy::Static => {
let chunks: Vec<_> = data.chunks(config.chunk_size).collect();
chunks
.par_iter()
.map(|chunk| {
chunk
.iter()
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
})
.sum::<f64>()
}
_ => {
data.par_iter()
.with_min_len(config.chunk_size)
.map(|&x| compute_with_cost(x, element_cost))
.sum::<f64>()
}
}
})
};
let duration = start.elapsed();
if sum < 0.0 {
println!("Sum is negative (should never happen): {}", sum);
}
duration
}
fn compute_with_cost(value: f64, cost: f64) -> f64 {
let iterations = (cost * 10.0) as usize;
let mut result = value;
for i in 0..iterations {
result = result.sin().cos() + (i as f64) * 0.000001;
}
result
}