use std::sync::Mutex;
pub mod thread_pools;
pub mod work_stealing;
pub mod strategies;
pub mod optimization;
pub use thread_pools::{
get_global_manager, AdvancedPerformanceStats, AdvancedPerformanceThreadPool,
AdvancedThreadPoolConfig, AffinityStrategy, AnomalySeverity, AnomalyType,
CacheAllocationPolicy, DecompositionType, DynamicSizingConfig, DynamicThreadManager,
IterativeSolverType, MemoryMetrics, MonitoringConfig, OperationType, PerformanceAnomaly,
PredictionModelParams, ProfileMetrics, ResourceIsolationConfig, ResourceUsagePattern,
ScalingDecision, ScalingReason, ScopedThreadPool, ThreadPoolConfig, ThreadPoolManager,
ThreadPoolProfile, ThreadPoolProfiler, ThreadPoolStats, WorkloadAdaptationConfig,
WorkloadCharacteristics, WorkloadPattern, WorkloadPredictor,
};
pub use work_stealing::{
AdaptiveChunking, AdaptiveChunkingStats, CacheAwareStrategy, CacheAwareWorkStealer,
CacheLocalityOptimizer, CacheOptimizationRecommendations, ChunkPerformance,
LoadBalancingParams, MatrixOperationType, MemoryAccessPattern, NumaTopology,
OptimizedSchedulerStats, OptimizedWorkStealingScheduler, PerformanceMonitor, PerformanceStats,
SchedulerStats, StealingStrategy, WorkComplexity, WorkItem, WorkPriority,
WorkStealingScheduler,
};
pub use work_stealing::matrix_ops::{
parallel_band_solve, parallel_block_gemm, parallel_cholesky_work_stealing,
parallel_eigvalsh_work_stealing, parallel_gemm_work_stealing, parallel_hessenberg_reduction,
parallel_lu_work_stealing, parallel_matvec_work_stealing, parallel_power_iteration,
parallel_qr_work_stealing, parallel_svd_work_stealing,
};
pub use work_stealing::parallel_gemm_cache_aware;
pub use strategies::{
WorkStealingScheduler as StrategyWorkStealingScheduler,
data_parallel::{
parallel_matvec, parallel_power_iteration, parallel_gemm,
parallel_conjugate_gradient, parallel_jacobi, vector_ops,
},
};
pub use optimization::{
DynamicLoadBalancer, LoadBalancingStats,
AdvancedWorkStealingScheduler, NumaTopology as OptimizationNumaTopology,
};
static GLOBAL_WORKERS: Mutex<Option<usize>> = Mutex::new(None);
#[allow(dead_code)]
pub fn set_global_workers(workers: Option<usize>) {
if let Ok(mut global) = GLOBAL_WORKERS.lock() {
*global = workers;
if let Some(num_workers) = workers {
std::env::set_var("OMP_NUM_THREADS", num_workers.to_string());
} else {
std::env::remove_var("OMP_NUM_THREADS");
}
}
}
#[allow(dead_code)]
pub fn get_global_workers() -> Option<usize> {
GLOBAL_WORKERS.lock().ok().and_then(|global| *global)
}
#[allow(dead_code)]
pub fn configure_workers(workers: Option<usize>) -> Option<usize> {
match workers {
Some(count) => {
std::env::set_var("OMP_NUM_THREADS", count.to_string());
Some(count)
}
None => {
let global_workers = get_global_workers();
if let Some(count) = global_workers {
std::env::set_var("OMP_NUM_THREADS", count.to_string());
}
global_workers
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub workers: Option<usize>,
pub parallel_threshold: usize,
pub chunksize: usize,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
workers: None,
parallel_threshold: 1000,
chunksize: 64,
}
}
}
impl WorkerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_workers(mut self, workers: usize) -> Self {
self.workers = Some(workers);
self
}
pub fn with_threshold(mut self, threshold: usize) -> Self {
self.parallel_threshold = threshold;
self
}
pub fn with_chunksize(mut self, chunksize: usize) -> Self {
self.chunksize = chunksize;
self
}
pub fn apply(&self) {
configure_workers(self.workers);
}
}
pub struct ScopedWorkers {
previous_workers: Option<usize>,
}
impl ScopedWorkers {
pub fn new(workers: Option<usize>) -> Self {
let previous_workers = get_global_workers();
set_global_workers(workers);
Self { previous_workers }
}
}
impl Drop for ScopedWorkers {
fn drop(&mut self) {
set_global_workers(self.previous_workers);
}
}
pub mod iter {
use scirs2_core::parallel_ops::*;
pub fn parallel_chunks<T, R, F>(_items: &[T], chunksize: usize, f: F) -> Vec<R>
where
T: Send + Sync,
R: Send,
F: Fn(&[T]) -> R + Send + Sync,
{
_items
.chunks(chunksize)
.collect::<Vec<_>>()
.into_par_iter()
.map(f)
.collect()
}
pub fn parallel_enumerate<T, R, F>(items: &[T], f: F) -> Vec<R>
where
T: Send + Sync,
R: Send,
F: Fn(usize, &T) -> R + Send + Sync,
{
items
.par_iter()
.enumerate()
.map(|(i, item)| f(i, item))
.collect()
}
}
pub mod adaptive {
use super::WorkerConfig;
#[derive(Debug, Clone, Copy)]
pub enum Strategy {
Serial,
Parallel,
Adaptive,
}
pub fn choose_strategy(_datasize: usize, config: &WorkerConfig) -> Strategy {
if _datasize < config.parallel_threshold {
Strategy::Serial
} else {
Strategy::Parallel
}
}
pub fn should_use_parallel(_datasize: usize, config: &WorkerConfig) -> bool {
matches!(choose_strategy(_datasize, config), Strategy::Parallel)
}
}