scirs2_linalg/parallel/
mod.rs

1//! Parallel processing utilities for linear algebra operations
2//!
3//! This module provides utilities for managing worker threads across various
4//! linear algebra operations, ensuring consistent behavior and optimal performance.
5
6use std::sync::Mutex;
7
8// Submodules for advanced parallel processing
9pub mod thread_pools;
10pub mod work_stealing;
11
12// Extracted module declarations
13pub mod adaptive;
14pub mod advanced_work_stealing;
15pub mod affinity;
16pub mod algorithms;
17pub mod iter;
18pub mod numa;
19pub mod scheduler;
20pub mod thread_pool;
21
22// Re-export submodule types
23pub use thread_pools::{
24    get_global_manager, AdvancedPerformanceStats, AdvancedPerformanceThreadPool,
25    AdvancedThreadPoolConfig, AffinityStrategy, AnomalySeverity, AnomalyType,
26    CacheAllocationPolicy, DecompositionType, DynamicSizingConfig, DynamicThreadManager,
27    IterativeSolverType, MemoryMetrics, MonitoringConfig, OperationType, PerformanceAnomaly,
28    PredictionModelParams, ProfileMetrics, ResourceIsolationConfig, ResourceUsagePattern,
29    ScalingDecision, ScalingReason, ScopedThreadPool, ThreadPoolConfig, ThreadPoolManager,
30    ThreadPoolProfile, ThreadPoolProfiler, ThreadPoolStats, WorkloadAdaptationConfig,
31    WorkloadCharacteristics, WorkloadPattern, WorkloadPredictor,
32};
33pub use work_stealing::{
34    AdaptiveChunking, AdaptiveChunkingStats, CacheAwareStrategy, CacheAwareWorkStealer,
35    CacheLocalityOptimizer, CacheOptimizationRecommendations, ChunkPerformance,
36    LoadBalancingParams, MatrixOperationType, MemoryAccessPattern, NumaTopology,
37    OptimizedSchedulerStats, OptimizedWorkStealingScheduler, PerformanceMonitor, PerformanceStats,
38    SchedulerStats, StealingStrategy, WorkComplexity, WorkItem, WorkPriority,
39    WorkStealingScheduler,
40};
41
42// Re-export matrix operations
43pub use work_stealing::matrix_ops::{
44    parallel_band_solve, parallel_block_gemm, parallel_cholesky_work_stealing,
45    parallel_eigvalsh_work_stealing, parallel_gemm_work_stealing, parallel_hessenberg_reduction,
46    parallel_lu_work_stealing, parallel_matvec_work_stealing, parallel_power_iteration,
47    parallel_qr_work_stealing, parallel_svd_work_stealing,
48};
49
50// Re-export cache-aware operations
51pub use work_stealing::parallel_gemm_cache_aware;
52
53/// Global worker configuration
54static GLOBAL_WORKERS: Mutex<Option<usize>> = Mutex::new(None);
55
56/// Set the global worker thread count for all operations
57///
58/// This affects operations that don't explicitly specify a worker count.
59/// If set to None, operations will use system defaults.
60///
61/// # Arguments
62///
63/// * `workers` - Number of worker threads (None = use system default)
64///
65/// # Examples
66///
67/// ```
68/// use scirs2_linalg::parallel::set_global_workers;
69///
70/// // Use 4 threads for all operations
71/// set_global_workers(Some(4));
72///
73/// // Reset to system default
74/// set_global_workers(None);
75/// ```
76#[allow(dead_code)]
77pub fn set_global_workers(workers: Option<usize>) {
78    if let Ok(mut global) = GLOBAL_WORKERS.lock() {
79        *global = workers;
80
81        // Set OpenMP environment variable if specified
82        if let Some(num_workers) = workers {
83            std::env::set_var("OMP_NUM_THREADS", num_workers.to_string());
84        } else {
85            // Remove the environment variable to use system default
86            std::env::remove_var("OMP_NUM_THREADS");
87        }
88    }
89}
90
91/// Get the current global worker thread count
92///
93/// # Returns
94///
95/// * Current global worker count (None = system default)
96#[allow(dead_code)]
97pub fn get_global_workers() -> Option<usize> {
98    GLOBAL_WORKERS.lock().ok().and_then(|global| *global)
99}
100
101/// Configure worker threads for an operation
102///
103/// This function determines the appropriate number of worker threads to use,
104/// considering both the operation-specific setting and global configuration.
105///
106/// # Arguments
107///
108/// * `workers` - Operation-specific worker count
109///
110/// # Returns
111///
112/// * Effective worker count to use
113#[allow(dead_code)]
114pub fn configure_workers(workers: Option<usize>) -> Option<usize> {
115    match workers {
116        Some(count) => {
117            // Operation-specific setting takes precedence
118            std::env::set_var("OMP_NUM_THREADS", count.to_string());
119            Some(count)
120        }
121        None => {
122            // Use global setting if available
123            let global_workers = get_global_workers();
124            if let Some(count) = global_workers {
125                std::env::set_var("OMP_NUM_THREADS", count.to_string());
126            }
127            global_workers
128        }
129    }
130}
131
132/// Worker configuration for batched operations
133#[derive(Debug, Clone)]
134pub struct WorkerConfig {
135    /// Number of worker threads
136    pub workers: Option<usize>,
137    /// Threshold for using parallel processing
138    pub parallel_threshold: usize,
139    /// Chunk size for batched operations
140    pub chunksize: usize,
141}
142
143impl Default for WorkerConfig {
144    fn default() -> Self {
145        Self {
146            workers: None,
147            parallel_threshold: 1000,
148            chunksize: 64,
149        }
150    }
151}
152
153impl WorkerConfig {
154    /// Create a new worker configuration
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Set the number of worker threads
160    pub fn with_workers(mut self, workers: usize) -> Self {
161        self.workers = Some(workers);
162        self
163    }
164
165    /// Set the parallel processing threshold
166    pub fn with_threshold(mut self, threshold: usize) -> Self {
167        self.parallel_threshold = threshold;
168        self
169    }
170
171    /// Set the chunk size for batched operations
172    pub fn with_chunksize(mut self, chunksize: usize) -> Self {
173        self.chunksize = chunksize;
174        self
175    }
176
177    /// Apply this configuration for the current operation
178    pub fn apply(&self) {
179        configure_workers(self.workers);
180    }
181}
182
183/// Scoped worker configuration
184///
185/// Temporarily sets worker configuration and restores the previous
186/// configuration when dropped.
187pub struct ScopedWorkers {
188    previous_workers: Option<usize>,
189}
190
191impl ScopedWorkers {
192    /// Create a scoped worker configuration
193    ///
194    /// # Arguments
195    ///
196    /// * `workers` - Number of worker threads for this scope
197    ///
198    /// # Returns
199    ///
200    /// * ScopedWorkers guard that restores previous configuration on drop
201    pub fn new(workers: Option<usize>) -> Self {
202        let previous_workers = get_global_workers();
203        set_global_workers(workers);
204        Self { previous_workers }
205    }
206}
207
208impl Drop for ScopedWorkers {
209    fn drop(&mut self) {
210        set_global_workers(self.previous_workers);
211    }
212}