use scirs2_core::error::CoreResult;
#[cfg(feature = "parallel")]
use scirs2_core::parallel::{
nested_scope, nested_scope_with_limits, with_nested_policy, DataPartitioner, LoadBalancer,
NestedConfig, NestedPolicy, PartitionerConfig, ResourceLimits, SchedulerConfigBuilder,
WorkStealingScheduler,
};
#[cfg(feature = "parallel")]
use scirs2_core::parallel_ops::*;
use std::time::{Duration, Instant};
#[allow(dead_code)]
fn main() -> CoreResult<()> {
#[cfg(not(feature = "parallel"))]
{
println!("This example requires the 'parallel' feature to be enabled.");
println!("Run with: cargo run --example advanced_parallel_processing --features parallel");
return Ok(());
}
#[cfg(feature = "parallel")]
{
println!("=== Advanced Parallel Processing Examples ===\n");
example_custom_partitioning()?;
example_work_stealing_scheduler()?;
example_nested_parallelism()?;
example_load_balancing()?;
example_complex_nested_computation()?;
}
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn example_custom_partitioning() -> CoreResult<()> {
println!("Example 1: Custom Partitioning Strategies");
println!("{}", "-".repeat(40));
let uniform_data: Vec<f64> = (0..10000).map(|i| i as f64).collect();
let skewed_data: Vec<f64> = (0..10000)
.map(|i| if i < 1000 { 1.0 } else { (i as f64).powf(2.0) })
.collect();
let gaussian_data: Vec<f64> = (0..10000)
.map(|i| {
let x = (i as f64 - 5000.0) / 1000.0;
(-x * x / 2.0).exp()
})
.collect();
let config = PartitionerConfig {
num_partitions: 4,
enable_load_balancing: true,
..Default::default()
};
let partitioner = DataPartitioner::<f64>::new(config);
println!("Uniform distribution:");
let uniform_dist = partitioner.analyze_distribution(&uniform_data);
println!(" Detected: {:?}", uniform_dist);
let uniform_strategy = partitioner.create_strategy(&uniform_dist, uniform_data.len())?;
let uniform_partitions = partitioner.partition(&uniform_data, &uniform_strategy)?;
print_partition_info(&uniform_partitions);
println!("\nSkewed distribution:");
let skewed_dist = partitioner.analyze_distribution(&skewed_data);
println!(" Detected: {:?}", skewed_dist);
let skewed_strategy = partitioner.create_strategy(&skewed_dist, skewed_data.len())?;
let skewed_partitions = partitioner.partition(&skewed_data, &skewed_strategy)?;
print_partition_info(&skewed_partitions);
println!("\nGaussian distribution:");
let gaussian_dist = partitioner.analyze_distribution(&gaussian_data);
println!(" Detected: {:?}", gaussian_dist);
let gaussian_strategy = partitioner.create_strategy(&gaussian_dist, gaussian_data.len())?;
let gaussian_partitions = partitioner.partition(&gaussian_data, &gaussian_strategy)?;
print_partition_info(&gaussian_partitions);
println!();
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn example_work_stealing_scheduler() -> CoreResult<()> {
println!("Example 2: Work-Stealing Scheduler");
println!("{}", "-".repeat(40));
let config = SchedulerConfigBuilder::new()
.workers(4)
.enable_stealing_heuristics(true)
.enable_priorities(true)
.adaptive(true)
.build();
let mut scheduler = WorkStealingScheduler::new(config);
let start = Instant::now();
for i in 0..5 {
let task = move || -> Result<(), scirs2_core::error::CoreError> {
println!("Task {} executing", i);
std::thread::sleep(Duration::from_millis(50));
Ok(())
};
scheduler.submit_fn(task);
}
for i in 5..15 {
let task = move || -> Result<(), scirs2_core::error::CoreError> {
println!("Task {} executing", i);
std::thread::sleep(Duration::from_millis(100));
Ok(())
};
scheduler.submit_fn(task);
}
std::thread::sleep(Duration::from_secs(2));
let stats = scheduler.stats();
println!("\nScheduler Statistics:");
println!(" Tasks submitted: {}", stats.tasks_submitted);
println!(" Tasks completed: {}", stats.tasks_completed);
println!(" Tasks stolen: {}", stats.successful_steals);
println!(" Failed steals: {}", stats.failed_steals);
println!(" Execution time: {:?}", start.elapsed());
scheduler.shutdown();
println!();
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn example_nested_parallelism() -> CoreResult<()> {
println!("Example 3: Nested Parallelism");
println!("{}", "-".repeat(40));
let limits = ResourceLimits {
max_total_threads: 8,
max_nesting_depth: 3,
threads_per_level: vec![4, 2, 1],
..Default::default()
};
let result = nested_scope_with_limits(limits, |outer_scope| {
println!("Outer level (depth 0) - starting nested processing");
let outer_data: Vec<i32> = (0..100).collect();
let outer_results = outer_scope.par_iter(outer_data, |x| {
nested_scope(|inner_scope| {
println!(" Inner level (depth 1) - processing item {}", x);
let inner_data: Vec<i32> = (0..10).collect();
let inner_sum: i32 = inner_scope
.par_iter(inner_data, |_y| {
nested_scope(|deep_scope| {
let deep_data: Vec<i32> = (0..5).collect();
deep_scope.par_iter(deep_data, |z| z * 2)
})
.unwrap_or_else(|_| vec![0; 5])
.iter()
.sum::<i32>()
})
.unwrap_or_else(|_| vec![0; 10])
.iter()
.sum();
Ok(x * inner_sum)
})
.unwrap_or(0)
})?;
println!("\nOuter results computed: {} items", outer_results.len());
Ok(outer_results)
})?;
println!("Total result items: {}", result.len());
println!();
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn example_load_balancing() -> CoreResult<()> {
println!("Example 4: Load Balancing");
println!("{}", "-".repeat(40));
let mut balancer = LoadBalancer::new(4, 1.2);
let partition_times = [
vec![100, 95, 105, 98], vec![200, 190, 210, 195], vec![150, 145, 155, 148], vec![50, 55, 45, 52], ];
println!("Initial execution times (ms):");
for (i, times) in partition_times.iter().enumerate() {
println!(" Partition {}: {:?}", i, times);
for &time in times {
balancer.recordexecution_time(i, Duration::from_millis(time));
}
}
let new_weights = balancer.rebalance();
println!("\nRebalanced weights:");
for (i, weight) in new_weights.iter().enumerate() {
println!(" Partition {}: {:.2}", i, weight);
}
let imbalance = balancer.get_imbalance_factor();
println!("\nLoad imbalance factor: {:.2}", imbalance);
println!();
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn example_complex_nested_computation() -> CoreResult<()> {
println!("Example 5: Complex Nested Computation");
println!("{}", "-".repeat(40));
let policies = vec![
(NestedPolicy::Allow, "Allow"),
(NestedPolicy::Sequential, "Sequential"),
(NestedPolicy::Deny, "Deny"),
];
for (policy, name) in policies {
println!("\nTesting with {} policy:", name);
let config = NestedConfig {
policy,
limits: ResourceLimits {
max_total_threads: 4,
max_nesting_depth: 2,
..Default::default()
},
..Default::default()
};
let start = Instant::now();
let result = with_nested_policy(config, || {
let size = 50;
let mut result = 0i64;
let outer_vec: Vec<usize> = (0..size).collect();
let outer_sum: i64 = outer_vec
.into_par_iter()
.map(|i| {
let middle_vec: Vec<usize> = (0..size).collect();
let middle_sum: i64 = middle_vec
.into_par_iter()
.map(|j| {
let inner_sum: i64 = (0..size).map(|k| (i * j * k) as i64).sum();
inner_sum
})
.sum();
middle_sum
})
.sum();
result += outer_sum;
Ok(result)
});
match result {
Ok(value) => {
println!(" Result: {}", value);
println!(" Time: {:?}", start.elapsed());
}
Err(e) => {
println!(" Error: {}", e);
}
}
}
println!();
Ok(())
}
#[cfg(feature = "parallel")]
#[allow(dead_code)]
fn print_partition_info<T>(partitions: &[Vec<T>]) {
println!(" Partition sizes:");
for (i, partition) in partitions.iter().enumerate() {
println!(" Partition {}: {} elements", i, partition.len());
}
if !partitions.is_empty() {
let sizes: Vec<usize> = partitions.iter().map(|p| p.len()).collect();
let minsize = *sizes.iter().min().unwrap_or(&0);
let maxsize = *sizes.iter().max().unwrap_or(&0);
let avg_size = sizes.iter().sum::<usize>() / sizes.len();
println!(
" Load balance: min={}, max={}, avg={}",
minsize, maxsize, avg_size
);
if minsize > 0 {
println!(" Imbalance factor: {:.2}", maxsize as f64 / minsize as f64);
}
}
}