use crate::error::{StatsError, StatsResult};
use scirs2_core::ndarray::{Array1, Array2, ArrayView1, ArrayView2, Axis};
use scirs2_core::numeric::{Float, NumCast, One, Zero};
use scirs2_core::{parallel_ops::*, simd_ops::SimdUnifiedOps, validation::*};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::thread;
#[derive(Debug, Clone)]
pub struct AdaptiveParallelConfig {
pub initial_threads: Option<usize>,
pub min_threads: usize,
pub max_threads: usize,
pub adaptation_interval: Duration,
pub scale_up_threshold: f64,
pub scale_down_threshold: f64,
pub enable_load_balancing: bool,
pub enable_work_stealing: bool,
pub enable_numa_affinity: bool,
pub performance_windowsize: usize,
}
impl Default for AdaptiveParallelConfig {
fn default() -> Self {
let cpu_count = num_cpus::get();
Self {
initial_threads: Some(cpu_count),
min_threads: 2.max(cpu_count / 4),
max_threads: cpu_count * 2,
adaptation_interval: Duration::from_millis(100),
scale_up_threshold: 0.8, scale_down_threshold: 0.3, enable_load_balancing: true,
enable_work_stealing: true,
enable_numa_affinity: true,
performance_windowsize: 10,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WorkDistributionStrategy {
EqualChunks,
AdaptiveChunks,
WorkStealing,
NumaAware,
MLGuided,
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub throughput: f64,
pub cpu_utilization: f64,
pub memory_bandwidth: f64,
pub cache_hit_ratio: f64,
pub thread_efficiency: f64,
pub load_imbalance: f64,
pub active_threads: usize,
}
#[derive(Debug, Clone)]
pub struct WorkUnit<T> {
pub id: usize,
pub data: T,
pub complexity: f64,
pub priority: u8,
pub preferred_numa_node: Option<usize>,
}
pub struct AdaptiveParallelProcessor<F> {
config: AdaptiveParallelConfig,
strategy: WorkDistributionStrategy,
performance_monitor: Arc<RwLock<PerformanceMonitor>>,
thread_pool: Arc<RwLock<DynamicThreadPool>>,
work_queue: Arc<Mutex<PriorityWorkQueue<ArrayView1<F>>>>,
adaptation_controller: Arc<RwLock<AdaptationController>>,
numa_topology: NumaTopology, _phantom: std::marker::PhantomData<F>,
}
#[derive(Debug)]
pub struct PerformanceMonitor {
performance_history: VecDeque<PerformanceMetrics>,
current_metrics: PerformanceMetrics,
start_time: Instant,
operations_completed: AtomicUsize,
}
#[derive(Debug)]
pub struct DynamicThreadPool {
workers: Vec<WorkerThread>,
active_count: AtomicUsize,
thread_counter: AtomicUsize,
shutdown: AtomicBool,
}
#[derive(Debug)]
pub struct WorkerThread {
handle: Option<thread::JoinHandle<()>>,
id: usize,
numa_node: Option<usize>,
current_load: Arc<Mutex<f64>>,
active: AtomicBool,
}
#[derive(Debug)]
pub struct PriorityWorkQueue<T> {
high_priority: VecDeque<WorkUnit<T>>,
medium_priority: VecDeque<WorkUnit<T>>,
low_priority: VecDeque<WorkUnit<T>>,
steal_queues: HashMap<usize, VecDeque<WorkUnit<T>>>,
queue_stats: QueueStatistics,
}
#[derive(Debug, Clone)]
pub struct QueueStatistics {
pub tasks_processed: usize,
pub avg_wait_time: Duration,
pub successful_steals: usize,
pub failed_steals: usize,
}
#[derive(Debug)]
pub struct AdaptationController {
performance_predictor: PerformancePredictor,
scaling_history: VecDeque<ScalingDecision>,
last_adaptation: Instant,
adaptation_stats: AdaptationStatistics,
}
#[derive(Debug)]
pub struct PerformancePredictor {
weights: Vec<f64>,
trainingdata: VecDeque<(Vec<f64>, f64)>,
accuracy: f64,
}
#[derive(Debug, Clone)]
pub struct ScalingDecision {
pub timestamp: Instant,
pub action: ScalingAction,
pub reason: String,
pub expected_benefit: f64,
pub actual_benefit: Option<f64>,
}
#[derive(Debug, Clone, Copy)]
pub enum ScalingAction {
ScaleUp { new_threads: usize },
ScaleDown { new_threads: usize },
ChangeStrategy { new_strategy: WorkDistributionStrategy },
Rebalance,
NoAction,
}
#[derive(Debug, Clone)]
pub struct AdaptationStatistics {
pub total_adaptations: usize,
pub successful_adaptations: usize,
pub avg_adaptation_overhead: Duration,
pub best_performance: f64,
}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub num_nodes: usize,
pub cores_per_node: Vec<usize>,
pub memory_bandwidth: Vec<f64>,
pub latency_matrix: Vec<Vec<f64>>,
}
impl<F> AdaptiveParallelProcessor<F>
where
F: Float + NumCast + SimdUnifiedOps + Zero + One + PartialOrd + Copy + Send + Sync + 'static
+ std::fmt::Display,
{
pub fn new() -> Self {
let config = AdaptiveParallelConfig::default();
Self::with_config(config)
}
pub fn with_config(config: AdaptiveParallelConfig) -> Self {
let numa_topology = NumaTopology::detect();
let performance_monitor = Arc::new(RwLock::new(PerformanceMonitor::new()));
let thread_pool = Arc::new(RwLock::new(DynamicThreadPool::new(&_config, &numa_topology)));
let work_queue = Arc::new(Mutex::new(PriorityWorkQueue::new()));
let adaptation_controller = Arc::new(RwLock::new(AdaptationController::new()));
let mut processor = Self {
strategy: WorkDistributionStrategy::AdaptiveChunks,
config,
performance_monitor,
thread_pool,
work_queue,
adaptation_controller,
numa_topology_phantom: std::marker::PhantomData,
};
processor.start_adaptation_loop();
processor
}
pub fn adaptive_mean(&mut self, data: &ArrayView1<F>) -> StatsResult<F> {
checkarray_finite(data, "data")?;
if data.is_empty() {
return Err(StatsError::InvalidArgument("Data cannot be empty".to_string()));
}
let n = data.len();
let optimal_strategy = self.select_optimal_strategy(n, 1.0); self.strategy = optimal_strategy;
match self.strategy {
WorkDistributionStrategy::EqualChunks => self.compute_mean_equal_chunks(data),
WorkDistributionStrategy::AdaptiveChunks => self.compute_mean_adaptive_chunks(data),
WorkDistributionStrategy::WorkStealing => self.compute_mean_work_stealing(data),
WorkDistributionStrategy::NumaAware => self.compute_mean_numa_aware(data),
WorkDistributionStrategy::MLGuided => self.compute_mean_ml_guided(data),
}
}
pub fn adaptive_variance(&mut self, data: &ArrayView1<F>, ddof: usize) -> StatsResult<F> {
checkarray_finite(data, "data")?;
if data.len() <= ddof {
return Err(StatsError::InvalidArgument(
"Insufficient data for variance calculation".to_string(),
));
}
let mean = self.adaptive_mean(data)?;
let n = data.len();
let optimal_strategy = self.select_optimal_strategy(n, 2.0); self.strategy = optimal_strategy;
match self.strategy {
WorkDistributionStrategy::AdaptiveChunks => {
self.compute_variance_adaptive_chunks(data, mean, ddof)
}
WorkDistributionStrategy::WorkStealing => {
self.compute_variance_work_stealing(data, mean, ddof)
}
WorkDistributionStrategy::NumaAware => {
self.compute_variance_numa_aware(data, mean, ddof)
}
_ => {
self.compute_variance_equal_chunks(data, mean, ddof)
}
}
}
fn select_optimal_strategy(&self, datasize: usize, complexity: f64) -> WorkDistributionStrategy {
let numa_nodes = self.numa_topology.num_nodes;
let cpu_count = num_cpus::get();
if datasize < 1000 {
WorkDistributionStrategy::EqualChunks
} else if numa_nodes > 1 && datasize > 100000 {
WorkDistributionStrategy::NumaAware
} else if complexity > 3.0 && cpu_count >= 8 {
WorkDistributionStrategy::WorkStealing
} else if self.config.enable_load_balancing {
WorkDistributionStrategy::AdaptiveChunks
} else {
WorkDistributionStrategy::EqualChunks
}
}
fn compute_mean_equal_chunks(&self, data: &ArrayView1<F>) -> StatsResult<F> {
let n = data.len();
let num_threads = self.get_optimal_thread_count(n, 1.0);
let chunksize = n / num_threads;
let result = parallel_map_reduce(
data,
chunksize,
|chunk| {
let sum: F = chunk.iter().copied().sum();
(sum, chunk.len())
},
|(sum1, count1), (sum2, count2)| (sum1 + sum2, count1 + count2),
);
let (total_sum, total_count) = result;
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
fn compute_mean_adaptive_chunks(&self, data: &ArrayView1<F>) -> StatsResult<F> {
let n = data.len();
let chunks = self.create_adaptive_chunks(data, 1.0)?;
let results: Vec<_> = chunks.iter().map(|chunk| {
let sum: F = chunk.iter().copied().sum();
(sum, chunk.len())
}).collect();
let (total_sum, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
fn compute_mean_work_stealing(&self, data: &ArrayView1<F>) -> StatsResult<F> {
let work_units = self.create_work_units(data, 1.0)?;
let results = self.execute_work_stealing(work_units, |chunk| {
let sum: F = chunk.iter().copied().sum();
(sum, chunk.len())
})?;
let (total_sum, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
fn compute_mean_numa_aware(&self, data: &ArrayView1<F>) -> StatsResult<F> {
let numa_chunks = self.distribute_numa_aware(data)?;
let results: Vec<_> = numa_chunks.into().iter().map(|(node_id, chunk)| {
let sum: F = chunk.iter().copied().sum();
(sum, chunk.len())
}).collect();
let (total_sum, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
fn compute_mean_ml_guided(&self, data: &ArrayView1<F>) -> StatsResult<F> {
let features = self.extract_workload_features(data, 1.0);
let predicted_optimal_chunks = if let Ok(controller) = self.adaptation_controller.read() {
controller.performance_predictor.predict_optimal_chunks(&features)
} else {
data.len() / num_cpus::get()
};
let chunksize = predicted_optimal_chunks.max(100).min(data.len() / 2);
let result = parallel_map_reduce(
data,
chunksize,
|chunk| {
let sum: F = chunk.iter().copied().sum();
(sum, chunk.len())
},
|(sum1, count1), (sum2, count2)| (sum1 + sum2, count1 + count2),
);
let (total_sum, total_count) = result;
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
fn compute_variance_adaptive_chunks(&self, data: &ArrayView1<F>, mean: F, ddof: usize) -> StatsResult<F> {
let chunks = self.create_adaptive_chunks(data, 2.0)?;
let results: Vec<_> = chunks.iter().map(|chunk| {
let sum_squared_diffs: F = chunk.iter()
.map(|&x| (x - mean) * (x - mean))
.sum();
(sum_squared_diffs, chunk.len())
}).collect();
let (total_sum_sq_diffs, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
let variance = total_sum_sq_diffs / F::from(total_count - ddof).expect("Failed to convert to float");
Ok(variance)
}
fn compute_variance_work_stealing(&self, data: &ArrayView1<F>, mean: F, ddof: usize) -> StatsResult<F> {
let work_units = self.create_work_units(data, 2.0)?;
let results = self.execute_work_stealing(work_units, |chunk| {
let sum_squared_diffs: F = chunk.iter()
.map(|&x| (x - mean) * (x - mean))
.sum();
(sum_squared_diffs, chunk.len())
})?;
let (total_sum_sq_diffs, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
let variance = total_sum_sq_diffs / F::from(total_count - ddof).expect("Failed to convert to float");
Ok(variance)
}
fn compute_variance_numa_aware(&self, data: &ArrayView1<F>, mean: F, ddof: usize) -> StatsResult<F> {
let numa_chunks = self.distribute_numa_aware(data)?;
let results: Vec<_> = numa_chunks.into().iter().map(|(node_id, chunk)| {
let sum_squared_diffs: F = chunk.iter()
.map(|&x| (x - mean) * (x - mean))
.sum();
(sum_squared_diffs, chunk.len())
}).collect();
let (total_sum_sq_diffs, total_count) = results.into().iter()
.fold((F::zero(), 0), |(acc_sum, acc_count), (sum, count)| {
(acc_sum + sum, acc_count + count)
});
let variance = total_sum_sq_diffs / F::from(total_count - ddof).expect("Failed to convert to float");
Ok(variance)
}
fn compute_variance_equal_chunks(&self, data: &ArrayView1<F>, mean: F, ddof: usize) -> StatsResult<F> {
let n = data.len();
let num_threads = self.get_optimal_thread_count(n, 2.0);
let chunksize = n / num_threads;
let result = parallel_map_reduce(
data,
chunksize,
|chunk| {
let sum_squared_diffs: F = chunk.iter()
.map(|&x| (x - mean) * (x - mean))
.sum();
(sum_squared_diffs, chunk.len())
},
|(sum1, count1), (sum2, count2)| (sum1 + sum2, count1 + count2),
);
let (total_sum_sq_diffs, total_count) = result;
let variance = total_sum_sq_diffs / F::from(total_count - ddof).expect("Failed to convert to float");
Ok(variance)
}
fn create_adaptive_chunks(&self, data: &ArrayView1<F>, complexity: f64) -> StatsResult<Vec<ArrayView1<F>>> {
let n = data.len();
let num_threads = self.get_optimal_thread_count(n, complexity);
let base_chunksize = n / num_threads;
let load_factor = self.get_current_load_factor();
let adjusted_chunksize = (base_chunksize as f64 * load_factor) as usize;
let mut chunks = Vec::new();
let mut start = 0;
while start < n {
let end = (start + adjusted_chunksize).min(n);
chunks.push(data.slice(scirs2_core::ndarray::s![start..end]));
start = end;
}
Ok(chunks)
}
fn create_work_units(&self, data: &ArrayView1<F>, complexity: f64) -> StatsResult<Vec<WorkUnit<ArrayView1<F>>>> {
let n = data.len();
let optimal_chunksize = self.calculate_optimal_work_unitsize(n, complexity);
let mut work_units = Vec::new();
let mut start = 0;
let mut id = 0;
while start < n {
let end = (start + optimal_chunksize).min(n);
let chunk = data.slice(scirs2_core::ndarray::s![start..end]);
work_units.push(WorkUnit {
id,
data: chunk,
complexity,
priority: 5, preferred_numa_node: Some(id % self.numa_topology.num_nodes),
});
start = end;
id += 1;
}
Ok(work_units)
}
fn execute_work_stealing<R, F_WORK>(
&self,
work_units: Vec<WorkUnit<ArrayView1<F>>>,
work_fn: F_WORK,
) -> StatsResult<Vec<R>>
where
F_WORK: Fn(&ArrayView1<F>) -> R + Send + Sync + Clone + 'static,
R: Send + 'static,
{
let results: Vec<R> = work_units.into().iter()
.map(|unit| work_fn(&unit.data))
.collect();
Ok(results)
}
fn distribute_numa_aware(&self, data: &ArrayView1<F>) -> StatsResult<Vec<(usize, ArrayView1<F>)>> {
let n = data.len();
let num_nodes = self.numa_topology.num_nodes;
let chunksize = n / num_nodes;
let mut numa_chunks = Vec::new();
for node_id in 0..num_nodes {
let start = node_id * chunksize;
let end = if node_id == num_nodes - 1 { n } else { start + chunksize };
if start < n {
let chunk = data.slice(scirs2_core::ndarray::s![start..end]);
numa_chunks.push((node_id, chunk));
}
}
Ok(numa_chunks)
}
fn get_optimal_thread_count(&self, datasize: usize, complexity: f64) -> usize {
let cpu_count = num_cpus::get();
let current_load = self.get_current_load_factor();
let base_threads = if datasize < 10000 {
2.min(cpu_count)
} else if complexity > 2.0 {
cpu_count
} else {
(cpu_count * 3 / 4).max(2)
};
let adjusted_threads = (base_threads as f64 * (2.0 - current_load)) as usize;
adjusted_threads.clamp(self.config.min_threads, self.config.max_threads)
}
fn calculate_optimal_work_unitsize(&self, totalsize: usize, complexity: f64) -> usize {
let basesize = 1000; let complexity_factor = complexity.sqrt();
let adjustedsize = (basesize as f64 * complexity_factor) as usize;
adjustedsize.clamp(100, totalsize / 4).max(1)
}
fn extract_workload_features(&self, data: &ArrayView1<F>, complexity: f64) -> Vec<f64> {
vec![
data.len() as f64, complexity, num_cpus::get() as f64, self.get_current_load_factor(), self.numa_topology.num_nodes as f64, ]
}
fn get_current_load_factor(&self) -> f64 {
0.5
}
fn start_adaptation_loop(&mut self) {
}
}
impl PerformanceMonitor {
fn new() -> Self {
Self {
performance_history: VecDeque::with_capacity(100),
current_metrics: PerformanceMetrics {
throughput: 0.0,
cpu_utilization: 0.0,
memory_bandwidth: 0.0,
cache_hit_ratio: 0.0,
thread_efficiency: 0.0,
load_imbalance: 0.0,
active_threads: 0,
},
start_time: Instant::now(),
operations_completed: AtomicUsize::new(0),
}
}
}
impl DynamicThreadPool {
fn new(_config: &AdaptiveParallelConfig, numatopology: &NumaTopology) -> Self {
let initial_threads = config.initial_threads.unwrap_or(num_cpus::get());
Self {
workers: Vec::with_capacity(_config.max_threads),
active_count: AtomicUsize::new(initial_threads),
thread_counter: AtomicUsize::new(0),
shutdown: AtomicBool::new(false),
}
}
}
impl<T> PriorityWorkQueue<T> {
fn new() -> Self {
Self {
high_priority: VecDeque::new(),
medium_priority: VecDeque::new(),
low_priority: VecDeque::new(),
steal_queues: HashMap::new(),
queue_stats: QueueStatistics {
tasks_processed: 0,
avg_wait_time: Duration::from_secs(0),
successful_steals: 0,
failed_steals: 0,
},
}
}
}
impl AdaptationController {
fn new() -> Self {
Self {
performance_predictor: PerformancePredictor::new(),
scaling_history: VecDeque::with_capacity(1000),
last_adaptation: Instant::now(),
adaptation_stats: AdaptationStatistics {
total_adaptations: 0,
successful_adaptations: 0,
avg_adaptation_overhead: Duration::from_millis(0),
best_performance: 0.0,
},
}
}
}
impl PerformancePredictor {
fn new() -> Self {
Self {
weights: vec![0.1, 0.2, 0.3, 0.2, 0.2], trainingdata: VecDeque::with_capacity(1000),
accuracy: 0.5,
}
}
fn predict_optimal_chunks(&self, features: &[f64]) -> usize {
let prediction: f64 = features.iter()
.zip(self.weights.iter())
.map(|(f, w)| f * w)
.sum();
prediction.max(100.0) as usize
}
}
impl NumaTopology {
fn detect() -> Self {
let num_nodes = 1; let cpu_count = num_cpus::get();
Self {
num_nodes,
cores_per_node: vec![cpu_count],
memory_bandwidth: vec![100.0], latency_matrix: vec![vec![1.0]], }
}
}
#[allow(dead_code)]
pub fn adaptive_mean_f64(data: &ArrayView1<f64>) -> StatsResult<f64> {
let mut processor = AdaptiveParallelProcessor::<f64>::new();
processor.adaptive_mean(data)
}
#[allow(dead_code)]
pub fn adaptive_variance_f64(data: &ArrayView1<f64>, ddof: usize) -> StatsResult<f64> {
let mut processor = AdaptiveParallelProcessor::<f64>::new();
processor.adaptive_variance(data, ddof)
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::array;
#[test]
fn test_adaptive_mean_basic() {
let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
let mut processor = AdaptiveParallelProcessor::<f64>::new();
let result = processor.adaptive_mean(&data.view()).expect("Operation failed");
assert!((result - 3.0).abs() < 1e-10);
}
#[test]
fn test_adaptive_variance_basic() {
let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
let mut processor = AdaptiveParallelProcessor::<f64>::new();
let result = processor.adaptive_variance(&data.view(), 1).expect("Operation failed");
assert!(result > 0.0); }
#[test]
fn test_work_distribution_strategies() {
let data: Array1<f64> = Array1::from_shape_fn(10000, |i| i as f64);
let mut processor = AdaptiveParallelProcessor::<f64>::new();
processor.strategy = WorkDistributionStrategy::EqualChunks;
let result1 = processor.adaptive_mean(&data.view()).expect("Operation failed");
processor.strategy = WorkDistributionStrategy::AdaptiveChunks;
let result2 = processor.adaptive_mean(&data.view()).expect("Operation failed");
assert!((result1 - result2).abs() < 1e-10);
}
#[test]
fn test_numa_topology_detection() {
let topology = NumaTopology::detect();
assert!(topology.num_nodes >= 1);
assert!(!topology.cores_per_node.is_empty());
}
}