Skip to main content

numrs2/parallel/
mod.rs

1//! Parallel processing enhancements and workload balancing
2//!
3//! This module provides advanced parallel processing capabilities including:
4//! - **Thread Pool**: Enhanced work-stealing thread pool with adaptive scheduling
5//! - **Scheduler**: Priority-based task scheduler with profiling and cost estimation
6//! - **Load Balancer**: Dynamic load balancing with NUMA awareness
7//! - **Work Stealing**: High-performance work-stealing implementation
8//! - **Parallel Algorithms**: Optimized parallel implementations (sort, scan, reduce, map-reduce, pipeline)
9//!
10//! # Features
11//!
12//! ## Work-Stealing Thread Pool
13//! - Per-thread work-stealing deques for efficient load distribution
14//! - Thread affinity and CPU pinning support
15//! - Adaptive thread count based on workload
16//! - Priority-based task scheduling
17//!
18//! ## Advanced Scheduling
19//! - Adaptive task granularity (automatic splitting of large tasks)
20//! - Task profiling and cost estimation
21//! - Dynamic load balancing
22//! - Task cancellation support
23//!
24//! ## NUMA Awareness
25//! - NUMA-aware allocation and scheduling
26//! - Work stealing across NUMA nodes
27//! - Memory locality optimization
28//!
29//! ## Parallel Algorithms
30//! - Parallel sort (merge sort, quick sort)
31//! - Parallel scan (prefix sum)
32//! - Parallel reduction with custom operations
33//! - Parallel map-reduce operations
34//! - Parallel pipeline processing
35//!
36//! # Example
37//!
38//! ```no_run
39//! use numrs2::parallel::{ThreadPool, ParallelArrayOps, ParallelConfig};
40//!
41//! // Create a thread pool
42//! let pool = ThreadPool::new().expect("Failed to create thread pool");
43//!
44//! // Submit tasks
45//! for i in 0..10 {
46//!     pool.submit(move || {
47//!         println!("Task {} executing", i);
48//!     }).expect("Failed to submit task");
49//! }
50//!
51//! // Wait for completion
52//! pool.wait().expect("Failed to wait");
53//!
54//! // Use parallel algorithms
55//! let config = ParallelConfig::default();
56//! let ops = ParallelArrayOps::new(config).expect("Failed to create parallel ops");
57//!
58//! let data = vec![5, 2, 8, 1, 9];
59//! let mut sorted = data.clone();
60//! ops.parallel_sort(&mut sorted).expect("Failed to sort");
61//! ```
62
63// Core parallel computing modules
64pub mod load_balancer;
65pub mod parallel_algorithms;
66pub mod parallel_allocator;
67pub mod scheduler;
68pub mod thread_pool;
69pub mod work_stealing;
70
71// Re-export load balancer types (NUMA-aware dynamic load balancing)
72pub use load_balancer::{
73    BalancingStrategy, LoadBalancer, LoadBalancingAdvisor, LoadBalancingAnalysis, WorkloadMetrics,
74};
75
76// Re-export parallel algorithm types (map/reduce/filter/sort/scan)
77pub use parallel_algorithms::{
78    parallel_prefix_sum, parallel_scan, ParallelArrayOps, ParallelConfig, ParallelFFT,
79    ParallelMatrixOps, ParallelPipeline, ParallelQuickSort, ScanMode,
80};
81
82// Re-export allocator types (parallel memory management)
83pub use parallel_allocator::{ParallelAllocator, ParallelAllocatorConfig, ThreadLocalAllocator};
84
85// Re-export scheduler types (adaptive task scheduler)
86pub use scheduler::{ParallelScheduler, SchedulerConfig, SchedulerStats, TaskPriority};
87
88// Re-export thread pool types (work-stealing thread pool)
89pub use thread_pool::{Priority, ThreadPool, ThreadPoolConfig, ThreadPoolStats};
90
91// Re-export work stealing types (work-stealing deque implementation)
92pub use work_stealing::{task, PoolStats, Task, TaskResult, WorkStealingConfig, WorkStealingPool};
93
94use crate::error::{NumRs2Error, Result};
95use std::sync::Arc;
96use std::time::Duration;
97
98/// Global parallel execution context
99pub struct ParallelContext {
100    scheduler: Arc<ParallelScheduler>,
101    load_balancer: Arc<LoadBalancer>,
102    work_stealing_pool: Arc<WorkStealingPool>,
103}
104
105impl ParallelContext {
106    /// Create a new parallel context with optimal configuration for the system
107    pub fn new() -> Result<Self> {
108        let num_cores = std::thread::available_parallelism()
109            .map(|n| n.get())
110            .unwrap_or(4);
111
112        let scheduler_config = SchedulerConfig::optimal_for_cores(num_cores);
113        let scheduler = Arc::new(ParallelScheduler::new(scheduler_config)?);
114
115        let load_balancer = Arc::new(LoadBalancer::new(BalancingStrategy::Adaptive, num_cores)?);
116
117        let work_stealing_pool = Arc::new(WorkStealingPool::new(num_cores)?);
118
119        Ok(Self {
120            scheduler,
121            load_balancer,
122            work_stealing_pool,
123        })
124    }
125
126    /// Create a parallel context with custom configuration
127    pub fn with_config(
128        scheduler_config: SchedulerConfig,
129        balancing_strategy: BalancingStrategy,
130        num_threads: usize,
131    ) -> Result<Self> {
132        let scheduler = Arc::new(ParallelScheduler::new(scheduler_config)?);
133        let load_balancer = Arc::new(LoadBalancer::new(balancing_strategy, num_threads)?);
134        let work_stealing_pool = Arc::new(WorkStealingPool::new(num_threads)?);
135
136        Ok(Self {
137            scheduler,
138            load_balancer,
139            work_stealing_pool,
140        })
141    }
142
143    /// Get the scheduler
144    pub fn scheduler(&self) -> &Arc<ParallelScheduler> {
145        &self.scheduler
146    }
147
148    /// Get the load balancer
149    pub fn load_balancer(&self) -> &Arc<LoadBalancer> {
150        &self.load_balancer
151    }
152
153    /// Get the work-stealing pool
154    pub fn work_stealing_pool(&self) -> &Arc<WorkStealingPool> {
155        &self.work_stealing_pool
156    }
157
158    /// Shutdown the parallel context gracefully
159    pub fn shutdown(&self) -> Result<()> {
160        self.work_stealing_pool.shutdown()?;
161        self.scheduler.shutdown()?;
162        Ok(())
163    }
164
165    /// Get current workload statistics
166    pub fn workload_stats(&self) -> WorkloadMetrics {
167        self.load_balancer.current_metrics()
168    }
169}
170
171impl Default for ParallelContext {
172    fn default() -> Self {
173        Self::new().unwrap_or_else(|_| {
174            // Fallback: create a minimal parallel context
175            // This should rarely happen, but prevents panics
176            let num_cores = 1; // Conservative fallback
177            let scheduler_config = SchedulerConfig::optimal_for_cores(num_cores);
178
179            // Create components with minimal configuration
180            // If any of these fail, we have a serious system issue
181            let scheduler = ParallelScheduler::new(scheduler_config)
182                .ok()
183                .map(Arc::new)
184                .unwrap_or_else(|| {
185                    // Last resort: create a scheduler with absolute minimal config
186                    Arc::new(
187                        ParallelScheduler::new(SchedulerConfig {
188                            num_threads: 1,
189                            max_queue_size: 100,
190                            enable_thread_affinity: false,
191                            enable_adaptive_scheduling: false,
192                            time_slice_ms: 10,
193                            work_stealing_threshold: 5,
194                            cache_aware_scheduling: false,
195                        })
196                        .unwrap_or_else(|_| {
197                            panic!("Cannot create even minimal ParallelScheduler - system unusable")
198                        }),
199                    )
200                });
201
202            let load_balancer = LoadBalancer::new(BalancingStrategy::Adaptive, num_cores)
203                .ok()
204                .map(Arc::new)
205                .unwrap_or_else(|| panic!("Cannot create LoadBalancer - system unusable"));
206
207            let work_stealing_pool = WorkStealingPool::new(num_cores)
208                .ok()
209                .map(Arc::new)
210                .unwrap_or_else(|| panic!("Cannot create WorkStealingPool - system unusable"));
211
212            Self {
213                scheduler,
214                load_balancer,
215                work_stealing_pool,
216            }
217        })
218    }
219}
220
221lazy_static::lazy_static! {
222    /// Thread-safe global parallel context instance
223    static ref GLOBAL_PARALLEL_CONTEXT: std::sync::Mutex<Option<Arc<ParallelContext>>> =
224        std::sync::Mutex::new(None);
225
226    /// Global persistent thread pool for reuse across all parallel operations
227    /// This eliminates thread creation/destruction overhead
228    static ref GLOBAL_THREAD_POOL: Arc<ThreadPool> = {
229        let num_threads = std::env::var("NUMRS2_THREAD_COUNT")
230            .ok()
231            .and_then(|s| s.parse().ok())
232            .or_else(|| std::thread::available_parallelism().ok().map(|n| n.get()))
233            .unwrap_or(2); // Default to 2 threads (sweet spot from benchmarks)
234
235        let config = ThreadPoolConfig {
236            num_threads: Some(num_threads),
237            enable_thread_pinning: false,
238            adaptive_threads: false,
239            min_threads: 1,
240            max_threads: num_threads,
241            queue_capacity: 10000,
242            steal_interval: Duration::from_micros(100),
243            idle_timeout: Duration::from_millis(100),
244        };
245
246        Arc::new(ThreadPool::with_config(config).unwrap_or_else(|_| {
247            // Fallback to minimal pool
248            ThreadPool::with_config(ThreadPoolConfig {
249                num_threads: Some(1),
250                ..Default::default()
251            }).expect("Failed to create fallback thread pool")
252        }))
253    };
254}
255
256/// Initialize the global parallel context
257pub fn initialize_parallel_context() -> Result<()> {
258    let context = Arc::new(ParallelContext::new()?);
259    let mut global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
260        NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
261    })?;
262    *global = Some(context);
263    Ok(())
264}
265
266/// Get the global parallel context
267pub fn global_parallel_context() -> Result<Arc<ParallelContext>> {
268    let global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
269        NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
270    })?;
271    global.clone().ok_or_else(|| {
272        NumRs2Error::RuntimeError("Global parallel context not initialized".to_string())
273    })
274}
275
276/// Shutdown the global parallel context
277pub fn shutdown_parallel_context() -> Result<()> {
278    let mut global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
279        NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
280    })?;
281    if let Some(context) = global.take() {
282        context.shutdown()?;
283    }
284    Ok(())
285}
286
287/// Get the global persistent thread pool
288///
289/// This pool is initialized once and reused across all parallel operations
290/// to eliminate thread creation/destruction overhead.
291pub fn global_thread_pool() -> Arc<ThreadPool> {
292    Arc::clone(&GLOBAL_THREAD_POOL)
293}
294
295/// Get the number of threads in the global thread pool
296pub fn global_thread_count() -> usize {
297    GLOBAL_THREAD_POOL.num_threads()
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_parallel_context_creation() {
306        let context =
307            ParallelContext::new().expect("ParallelContext creation should succeed in test");
308        assert!(context.scheduler.num_threads() > 0);
309        assert!(context.load_balancer.num_workers() > 0);
310    }
311
312    #[test]
313    fn test_global_context_initialization() {
314        initialize_parallel_context().expect("initialize_parallel_context should succeed in test");
315        let context =
316            global_parallel_context().expect("global_parallel_context should succeed in test");
317        assert!(context.scheduler.num_threads() > 0);
318        shutdown_parallel_context().expect("shutdown_parallel_context should succeed in test");
319    }
320
321    #[test]
322    fn test_workload_stats() {
323        let context =
324            ParallelContext::new().expect("ParallelContext creation should succeed in test");
325        let stats = context.workload_stats();
326        assert_eq!(stats.active_tasks, 0);
327        assert!(stats.total_throughput >= 0.0);
328    }
329}