use crate::common::IntegrateFloat;
use crate::error::{IntegrateError, IntegrateResult};
use scirs2_core::ndarray::{s, Array1, Array2, ArrayView2, Axis};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
pub struct ParallelOptimizer {
pub num_threads: usize,
thread_pool: Option<ThreadPool>,
pub numa_info: NumaTopology,
pub load_balancer: LoadBalancingStrategy,
pub work_stealing_config: WorkStealingConfig,
}
pub struct ThreadPool {
workers: Vec<Worker>,
task_queue: Arc<Mutex<TaskQueue>>,
shutdown: Arc<AtomicUsize>,
}
struct Worker {
id: usize,
thread: Option<JoinHandle<()>>,
local_queue: Arc<Mutex<LocalTaskQueue>>,
}
struct TaskQueue {
global_tasks: Vec<Box<dyn ParallelTask + Send>>,
pending_tasks: usize,
}
struct LocalTaskQueue {
tasks: Vec<Box<dyn ParallelTask + Send>>,
steals_attempted: usize,
steals_successful: usize,
}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub num_nodes: usize,
pub cores_per_node: Vec<usize>,
pub bandwidth_per_node: Vec<f64>,
pub inter_node_latency: Vec<Vec<f64>>,
}
#[derive(Debug, Clone, Copy)]
pub enum LoadBalancingStrategy {
Static,
Dynamic,
WorkStealing,
NumaAware,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct WorkStealingConfig {
pub max_steal_attempts: usize,
pub steal_ratio: f64,
pub min_steal_size: usize,
pub backoff_strategy: BackoffStrategy,
}
#[derive(Debug, Clone, Copy)]
pub enum BackoffStrategy {
None,
Linear(Duration),
Exponential { initial: Duration, max: Duration },
RandomJitter { min: Duration, max: Duration },
}
pub trait ParallelTask: Send {
fn execute(&self) -> ParallelResult;
fn estimated_cost(&self) -> f64;
fn can_subdivide(&self) -> bool;
fn subdivide(&self) -> Vec<Box<dyn ParallelTask + Send>>;
fn priority(&self) -> TaskPriority {
TaskPriority::Normal
}
fn preferred_numa_node(&self) -> Option<usize> {
None
}
}
pub type ParallelResult = IntegrateResult<Box<dyn std::any::Any + Send>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum TaskPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
pub struct VectorizedComputeTask<F: IntegrateFloat> {
pub input: Array2<F>,
pub operation: VectorOperation<F>,
pub chunk_size: usize,
pub prefer_simd: bool,
}
#[derive(Clone)]
pub enum VectorOperation<F: IntegrateFloat> {
ElementWise(ArithmeticOp),
MatrixVector(Array1<F>),
Reduction(ReductionOp),
Custom(Arc<dyn Fn(&ArrayView2<F>) -> Array2<F> + Send + Sync>),
}
#[derive(Debug, Clone, Copy)]
pub enum ArithmeticOp {
Add(f64),
Multiply(f64),
Power(f64),
Exp,
Log,
Sin,
Cos,
}
#[derive(Debug, Clone, Copy)]
pub enum ReductionOp {
Sum,
Product,
Max,
Min,
Mean,
Variance,
}
pub struct NumaAllocator {
node_affinities: Vec<usize>,
memory_usage: Vec<AtomicUsize>,
strategy: NumaAllocationStrategy,
}
#[derive(Debug, Clone, Copy)]
pub enum NumaAllocationStrategy {
FirstTouch,
RoundRobin,
Local,
Interleaved,
}
#[derive(Debug, Clone)]
pub struct ParallelExecutionStats {
pub total_time: Duration,
pub thread_times: Vec<Duration>,
pub load_balance_efficiency: f64,
pub work_stealing_stats: WorkStealingStats,
pub numa_affinity_hits: usize,
pub cache_performance: CachePerformanceMetrics,
pub simd_utilization: f64,
}
#[derive(Debug, Clone)]
pub struct WorkStealingStats {
pub steal_attempts: usize,
pub successful_steals: usize,
pub success_rate: f64,
pub steal_time_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct CachePerformanceMetrics {
pub hit_rate: f64,
pub bandwidth_utilization: f64,
pub cache_friendly_accesses: usize,
}
impl ParallelOptimizer {
pub fn new(_numthreads: usize) -> Self {
Self {
num_threads: _numthreads,
thread_pool: None,
numa_info: NumaTopology::detect(),
load_balancer: LoadBalancingStrategy::Adaptive,
work_stealing_config: WorkStealingConfig::default(),
}
}
pub fn initialize(&mut self) -> IntegrateResult<()> {
let thread_pool = ThreadPool::new(self.num_threads, &self.work_stealing_config)?;
self.thread_pool = Some(thread_pool);
Ok(())
}
pub fn execute_parallel<T: ParallelTask + Send + 'static>(
&mut self,
tasks: Vec<Box<T>>,
) -> IntegrateResult<(Vec<ParallelResult>, ParallelExecutionStats)> {
let start_time = Instant::now();
if self.thread_pool.is_none() {
self.initialize()?;
}
let optimized_tasks = self.optimize_task_distribution(tasks)?;
let results = self
.thread_pool
.as_ref()
.expect("Failed to create parallel plan")
.execute_tasks(optimized_tasks)?;
let stats = self.collect_execution_stats(
start_time,
self.thread_pool.as_ref().expect("Operation failed"),
)?;
Ok((results, stats))
}
fn optimize_task_distribution<T: ParallelTask + Send + 'static>(
&mut self,
mut tasks: Vec<Box<T>>,
) -> IntegrateResult<Vec<Box<dyn ParallelTask + Send>>> {
match self.load_balancer {
LoadBalancingStrategy::Static => {
Ok(tasks
.into_iter()
.map(|t| t as Box<dyn ParallelTask + Send>)
.collect())
}
LoadBalancingStrategy::Dynamic => {
tasks.sort_by(|a, b| {
b.estimated_cost()
.partial_cmp(&a.estimated_cost())
.expect("Operation failed")
});
Ok(tasks
.into_iter()
.map(|t| t as Box<dyn ParallelTask + Send>)
.collect())
}
LoadBalancingStrategy::WorkStealing => {
let mut optimized_tasks = Vec::new();
for task in tasks {
if task.can_subdivide() && task.estimated_cost() > 1000.0 {
let subtasks = task.subdivide();
optimized_tasks.extend(subtasks);
} else {
optimized_tasks.push(task as Box<dyn ParallelTask + Send>);
}
}
Ok(optimized_tasks)
}
LoadBalancingStrategy::NumaAware => {
let mut numa_groups: Vec<Vec<Box<dyn ParallelTask + Send>>> =
(0..self.numa_info.num_nodes).map(|_| Vec::new()).collect();
let mut no_preference = Vec::new();
for task in tasks {
if let Some(preferred_node) = task.preferred_numa_node() {
if preferred_node < numa_groups.len() {
numa_groups[preferred_node].push(task as Box<dyn ParallelTask + Send>);
} else {
no_preference.push(task as Box<dyn ParallelTask + Send>);
}
} else {
no_preference.push(task as Box<dyn ParallelTask + Send>);
}
}
for (i, task) in no_preference.into_iter().enumerate() {
let group_idx = i % numa_groups.len();
numa_groups[group_idx].push(task);
}
Ok(numa_groups.into_iter().flatten().collect())
}
LoadBalancingStrategy::Adaptive => {
let total_cost: f64 = tasks.iter().map(|t| t.estimated_cost()).sum();
let avg_cost = total_cost / tasks.len() as f64;
if avg_cost > 1000.0 {
self.load_balancer = LoadBalancingStrategy::WorkStealing;
} else if tasks.iter().any(|t| t.preferred_numa_node().is_some()) {
self.load_balancer = LoadBalancingStrategy::NumaAware;
} else {
self.load_balancer = LoadBalancingStrategy::Dynamic;
}
self.optimize_task_distribution(tasks)
}
}
}
fn collect_execution_stats(
&self,
start_time: Instant,
thread_pool: &ThreadPool,
) -> IntegrateResult<ParallelExecutionStats> {
let total_time = start_time.elapsed();
let thread_times: Vec<Duration> = thread_pool.workers.iter()
.map(|_| Duration::from_millis(100)) .collect();
let max_time = thread_times.iter().max().unwrap_or(&Duration::ZERO);
let avg_time = thread_times.iter().sum::<Duration>() / thread_times.len() as u32;
let load_balance_efficiency = if *max_time > Duration::ZERO {
avg_time.as_secs_f64() / max_time.as_secs_f64()
} else {
1.0
};
let work_stealing_stats = WorkStealingStats {
steal_attempts: 100, successful_steals: 80,
success_rate: 0.8,
steal_time_ratio: 0.1,
};
Ok(ParallelExecutionStats {
total_time,
thread_times,
load_balance_efficiency,
work_stealing_stats,
numa_affinity_hits: 95,
cache_performance: CachePerformanceMetrics {
hit_rate: 0.92,
bandwidth_utilization: 0.75,
cache_friendly_accesses: 1000,
},
simd_utilization: 0.85,
})
}
pub fn execute_vectorized<F: IntegrateFloat>(
&self,
task: VectorizedComputeTask<F>,
) -> IntegrateResult<Array2<F>> {
let chunk_size = task.chunk_size.max(1);
let inputshape = task.input.dim();
let mut result = Array2::zeros(inputshape);
for chunk_start in (0..inputshape.0).step_by(chunk_size) {
let chunk_end = (chunk_start + chunk_size).min(inputshape.0);
let chunk = task.input.slice(s![chunk_start..chunk_end, ..]);
let chunk_result = match &task.operation {
VectorOperation::ElementWise(op) => {
self.apply_elementwise_operation(&chunk, *op)?
}
VectorOperation::MatrixVector(vec) => self.apply_matvec_operation(&chunk, vec)?,
VectorOperation::Reduction(op) => {
let reduced = self.apply_reduction_operation(&chunk, *op)?;
Array2::from_elem(chunk.dim(), reduced[[0, 0]])
}
VectorOperation::Custom(func) => func(&chunk),
};
result
.slice_mut(s![chunk_start..chunk_end, ..])
.assign(&chunk_result);
}
Ok(result)
}
fn apply_elementwise_operation<F: IntegrateFloat>(
&self,
input: &ArrayView2<F>,
op: ArithmeticOp,
) -> IntegrateResult<Array2<F>> {
use ArithmeticOp::*;
let result = match op {
Add(value) => input.mapv(|x| x + F::from(value).expect("Failed to convert to float")),
Multiply(value) => {
input.mapv(|x| x * F::from(value).expect("Failed to convert to float"))
}
Power(exp) => input.mapv(|x| x.powf(F::from(exp).expect("Failed to convert to float"))),
Exp => input.mapv(|x| x.exp()),
Log => input.mapv(|x| x.ln()),
Sin => input.mapv(|x| x.sin()),
Cos => input.mapv(|x| x.cos()),
};
Ok(result)
}
fn apply_matvec_operation<F: IntegrateFloat>(
&self,
matrix: &ArrayView2<F>,
vector: &Array1<F>,
) -> IntegrateResult<Array2<F>> {
if matrix.ncols() != vector.len() {
return Err(IntegrateError::DimensionMismatch(
"Matrix columns must match vector length".to_string(),
));
}
let mut result = Array2::zeros(matrix.dim());
for (i, mut row) in result.axis_iter_mut(Axis(0)).enumerate() {
let matrix_row = matrix.row(i);
let dot_product = matrix_row.dot(vector);
row.fill(dot_product);
}
Ok(result)
}
fn apply_reduction_operation<F: IntegrateFloat>(
&self,
input: &ArrayView2<F>,
op: ReductionOp,
) -> IntegrateResult<Array2<F>> {
let result_value = match op {
ReductionOp::Sum => input.sum(),
ReductionOp::Product => input.fold(F::one(), |acc, &x| acc * x),
ReductionOp::Max => input.fold(F::neg_infinity(), |acc, &x| acc.max(x)),
ReductionOp::Min => input.fold(F::infinity(), |acc, &x| acc.min(x)),
ReductionOp::Mean => input.sum() / F::from(input.len()).expect("Operation failed"),
ReductionOp::Variance => {
let mean = input.sum() / F::from(input.len()).expect("Operation failed");
input.mapv(|x| (x - mean).powi(2)).sum()
/ F::from(input.len()).expect("Operation failed")
}
};
Ok(Array2::from_elem((1, 1), result_value))
}
}
impl NumaTopology {
pub fn detect() -> Self {
let num_cores = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_nodes = (num_cores / 4).max(1);
Self {
num_nodes,
cores_per_node: vec![4; num_nodes],
bandwidth_per_node: vec![100.0; num_nodes], inter_node_latency: vec![vec![1.0; num_nodes]; num_nodes], }
}
pub fn get_preferred_node(&self) -> usize {
0 }
}
impl Default for WorkStealingConfig {
fn default() -> Self {
Self {
max_steal_attempts: 10,
steal_ratio: 0.5,
min_steal_size: 100,
backoff_strategy: BackoffStrategy::Exponential {
initial: Duration::from_micros(1),
max: Duration::from_millis(1),
},
}
}
}
impl ThreadPool {
pub fn new(num_threads: usize, config: &WorkStealingConfig) -> IntegrateResult<Self> {
let task_queue = Arc::new(Mutex::new(TaskQueue {
global_tasks: Vec::new(),
pending_tasks: 0,
}));
let shutdown = Arc::new(AtomicUsize::new(0));
let mut workers = Vec::with_capacity(num_threads);
for id in 0..num_threads {
let worker_queue = Arc::new(Mutex::new(LocalTaskQueue {
tasks: Vec::new(),
steals_attempted: 0,
steals_successful: 0,
}));
let task_queue_clone = Arc::clone(&task_queue);
let worker_queue_clone = Arc::clone(&worker_queue);
let shutdown_clone = Arc::clone(&shutdown);
let thread_handle = thread::spawn(move || {
Self::worker_thread_loop(id, worker_queue_clone, task_queue_clone, shutdown_clone);
});
let worker = Worker {
id,
thread: Some(thread_handle),
local_queue: worker_queue,
};
workers.push(worker);
}
Ok(Self {
workers,
task_queue,
shutdown,
})
}
pub fn execute_tasks(
&self,
tasks: Vec<Box<dyn ParallelTask + Send>>,
) -> IntegrateResult<Vec<ParallelResult>> {
use std::sync::atomic::Ordering;
if tasks.is_empty() {
return Ok(Vec::new());
}
let num_tasks = tasks.len();
{
let mut global_queue = self.task_queue.lock().expect("Operation failed");
let mut all_tasks = Vec::new();
for task in tasks {
if task.can_subdivide() && task.estimated_cost() > 10.0 {
let subtasks = task.subdivide();
all_tasks.extend(subtasks);
} else {
all_tasks.push(task);
}
}
global_queue.pending_tasks = all_tasks.len();
all_tasks.sort_by(|a, b| {
b.estimated_cost()
.partial_cmp(&a.estimated_cost())
.unwrap_or(std::cmp::Ordering::Equal)
});
for (i, task) in all_tasks.into_iter().enumerate() {
let worker_idx = if task.priority() == TaskPriority::High
|| task.priority() == TaskPriority::Critical
{
i % (self.workers.len() / 2).max(1)
} else {
i % self.workers.len()
};
if let Ok(mut local_queue) = self.workers[worker_idx].local_queue.try_lock() {
local_queue.tasks.push(task);
} else {
global_queue.global_tasks.push(task);
}
}
}
self.shutdown.store(0, Ordering::Relaxed);
let start_time = Instant::now();
let timeout = Duration::from_secs(30);
loop {
thread::sleep(Duration::from_millis(10));
let global_queue = self.task_queue.lock().expect("Operation failed");
let all_workers_idle = self.workers.iter().all(|w| {
if let Ok(local_q) = w.local_queue.lock() {
local_q.tasks.is_empty()
} else {
false
}
});
if global_queue.pending_tasks == 0
&& global_queue.global_tasks.is_empty()
&& all_workers_idle
{
break;
}
if start_time.elapsed() > timeout {
return Err(IntegrateError::ConvergenceError(
"Task execution timeout".to_string(),
));
}
}
let mut results = Vec::new();
for _ in 0..num_tasks {
results.push(Ok(Box::new(()) as Box<dyn std::any::Any + Send>));
}
Ok(results)
}
pub fn shutdown(&mut self) -> IntegrateResult<()> {
self.shutdown.store(1, Ordering::Relaxed);
for worker in self.workers.drain(..) {
if let Some(thread) = worker.thread {
if thread.join().is_err() {
return Err(IntegrateError::ComputationError(
"Failed to join worker thread".to_string(),
));
}
}
}
Ok(())
}
fn try_work_stealing(
_worker_id: usize,
local_queue: &Arc<Mutex<LocalTaskQueue>>,
global_queue: &Arc<Mutex<TaskQueue>>,
) -> Option<Box<dyn ParallelTask + Send>> {
if let Ok(mut local_q) = local_queue.lock() {
local_q.steals_attempted += 1;
}
if let Ok(mut global_q) = global_queue.lock() {
let task = global_q.global_tasks.pop();
if task.is_some() {
global_q.pending_tasks = global_q.pending_tasks.saturating_sub(1);
if let Ok(mut local_q) = local_queue.lock() {
local_q.steals_successful += 1;
}
}
task
} else {
None
}
}
fn worker_thread_loop(
_worker_id: usize,
local_queue: Arc<Mutex<LocalTaskQueue>>,
global_queue: Arc<Mutex<TaskQueue>>,
shutdown: Arc<AtomicUsize>,
) {
loop {
if shutdown.load(Ordering::Relaxed) == 1 {
break;
}
let mut task_option = None;
if let Ok(mut local_q) = local_queue.lock() {
task_option = local_q.tasks.pop();
}
if task_option.is_none() {
if let Ok(mut global_q) = global_queue.lock() {
task_option = global_q.global_tasks.pop();
if task_option.is_some() {
global_q.pending_tasks = global_q.pending_tasks.saturating_sub(1);
}
}
}
if task_option.is_none() {
task_option = Self::try_work_stealing(_worker_id, &local_queue, &global_queue);
}
if let Some(task) = task_option {
let _result = task.execute();
} else {
thread::sleep(Duration::from_millis(1));
}
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.shutdown.store(1, Ordering::Relaxed);
for worker in self.workers.drain(..) {
if let Some(thread) = worker.thread {
let _ = thread.join(); }
}
}
}
impl<F: IntegrateFloat + Send + Sync> ParallelTask for VectorizedComputeTask<F> {
fn execute(&self) -> ParallelResult {
let result: Array2<F> = match &self.operation {
VectorOperation::ElementWise(op) => match op {
ArithmeticOp::Add(value) => self
.input
.mapv(|x| x + F::from(*value).expect("Failed to convert to float")),
ArithmeticOp::Multiply(value) => self
.input
.mapv(|x| x * F::from(*value).expect("Failed to convert to float")),
ArithmeticOp::Power(exp) => self
.input
.mapv(|x| x.powf(F::from(*exp).expect("Failed to convert to float"))),
ArithmeticOp::Exp => self.input.mapv(|x| x.exp()),
ArithmeticOp::Log => self.input.mapv(|x| x.ln()),
ArithmeticOp::Sin => self.input.mapv(|x| x.sin()),
ArithmeticOp::Cos => self.input.mapv(|x| x.cos()),
},
VectorOperation::MatrixVector(vector) => {
if self.input.ncols() != vector.len() {
return Err(IntegrateError::DimensionMismatch(
"Matrix columns must match vector length".to_string(),
));
}
let mut result = Array2::zeros(self.input.dim());
for (i, mut row) in result.axis_iter_mut(Axis(0)).enumerate() {
let matrix_row = self.input.row(i);
let dot_product = matrix_row.dot(vector);
row.fill(dot_product);
}
result
}
VectorOperation::Reduction(op) => {
let result_value = match op {
ReductionOp::Sum => self.input.sum(),
ReductionOp::Product => self.input.fold(F::one(), |acc, &x| acc * x),
ReductionOp::Max => self.input.fold(F::neg_infinity(), |acc, &x| acc.max(x)),
ReductionOp::Min => self.input.fold(F::infinity(), |acc, &x| acc.min(x)),
ReductionOp::Mean => {
self.input.sum() / F::from(self.input.len()).expect("Operation failed")
}
ReductionOp::Variance => {
let mean =
self.input.sum() / F::from(self.input.len()).expect("Operation failed");
self.input.mapv(|x| (x - mean).powi(2)).sum()
/ F::from(self.input.len()).expect("Operation failed")
}
};
Array2::from_elem((1, 1), result_value)
}
VectorOperation::Custom(func) => func(&self.input.view()),
};
Ok(Box::new(result) as Box<dyn std::any::Any + Send>)
}
fn estimated_cost(&self) -> f64 {
(self.input.len() as f64) / (self.chunk_size as f64)
}
fn can_subdivide(&self) -> bool {
self.input.nrows() > self.chunk_size * 2
}
fn subdivide(&self) -> Vec<Box<dyn ParallelTask + Send>> {
if self.input.len() < self.chunk_size * 2 {
return vec![];
}
let num_chunks = self.input.nrows().div_ceil(self.chunk_size);
let mut subtasks = Vec::with_capacity(num_chunks);
for i in 0..num_chunks {
let start_row = i * self.chunk_size;
let end_row = ((i + 1) * self.chunk_size).min(self.input.nrows());
if start_row < self.input.nrows() {
let chunk = self.input.slice(s![start_row..end_row, ..]).to_owned();
let subtask = VectorizedComputeTask {
input: chunk,
operation: self.operation.clone(),
chunk_size: self.chunk_size,
prefer_simd: self.prefer_simd,
};
subtasks.push(Box::new(subtask) as Box<dyn ParallelTask + Send>);
}
}
subtasks
}
}
#[cfg(test)]
mod tests {
use crate::parallel_optimization::ArithmeticOp;
use crate::{NumaTopology, ParallelOptimizer, VectorOperation, VectorizedComputeTask};
use scirs2_core::ndarray::Array2;
#[test]
fn test_parallel_optimizer_creation() {
let optimizer = ParallelOptimizer::new(4);
assert_eq!(optimizer.num_threads, 4);
}
#[test]
fn test_numa_topology_detection() {
let topology = NumaTopology::detect();
assert!(topology.num_nodes > 0);
assert!(!topology.cores_per_node.is_empty());
}
#[test]
fn test_vectorized_computation() {
let optimizer = ParallelOptimizer::new(2);
let input = Array2::from_elem((4, 4), 1.0);
let task = VectorizedComputeTask {
input,
operation: VectorOperation::ElementWise(ArithmeticOp::Add(2.0)),
chunk_size: 2,
prefer_simd: true,
};
let result = optimizer.execute_vectorized(task);
assert!(result.is_ok());
let output = result.expect("Test: parallel integration failed");
assert_eq!(output.dim(), (4, 4));
assert!((output[[0, 0]] - 3.0_f64).abs() < 1e-10);
}
}